In [4]:
# -------------------------------------------------------------------------
# Feb 17, 2024 Created
# Team: YNKuoJLien
# Code creator: Yan-Ning Kuo (yk545@cornell.edu)
#
# ForceSMIP_function_ColorLIMnMCA_YNKuoJLien.ipynb is a script including 
# the functions necessary for conducting the forced response estimate from 
# group YNKuoJLien with ColorLIMnMCA - a combination of a Colored Noise 
# Linear Inverse Model with Colored Noise (ColorLIM; Lien, under review) 
# and Maximum Covariance Analysis (MCA).
#
# ColorLIM is a variant of LIM designed for a system driven by colored noise,
# and is developed by Justin Lien (lien.justin.t8@dc.tohoku.ac.jp) with a 
# under review paper (2024) for its mathematical foundation:
# https://arxiv.org/abs/2402.15184v1
#
# -------------------------------------------------------------------------

## Import some necessary libraries
from os import walk
import glob
import re
import numpy as np
import xarray as xr
import xcat as xc
import time as clocktime
import matplotlib.pyplot as plt
import cartopy.crs as ccrs

def mat2vec(Xmat):
    dimsize = Xmat.shape
    Xvec = np.empty((dimsize[0]*dimsize[1],1)) * np.nan
    Xvec[:,0] = Xmat.reshape(dimsize[0]*dimsize[1])
    return Xvec

def Symmetrization(X):
    Xsym = 0.5 * (X + X.T)
    return Xsym

def AntiSymmetrization(X):
    Xantisym = 0.5 * (X - X.T)
    return Xantisym

def vec2mat(Xvec, dimsize0, dimsize1):
    Xmat = Xvec.reshape(dimsize0, dimsize1)
    return Xmat

def com_mat(m, n):
    if (m!=n):
        A = np.arange(m*n)
        A = A.reshape(m,n,order='F').copy()
        v = A.T
        v = v.reshape(m*n,order='F').copy()
        P = np.eye(m*n)
        P = P[v,:]
    else:
        A = np.arange(m*n)
        A = A.reshape(m,n)
        v = A.T
        v = v.reshape(m*n)
        P = np.eye(m*n)
        P = P[v,:]
    return P

def xcorr(x, y, maxlags=5,mode='unbiased'):
    # e.g. lags, c = xcorr(x,y,maxlags=len(x)-1)
    c = np.correlate(x, y, mode='full')
    idx0 = int((len(c)-1)/2);
    idx = np.arange(idx0-2*idx0,idx0+1,1)
    strloc = int(np.where(idx==-maxlags)[0])
    endloc = int(np.where(idx==maxlags)[0])+1
    if (mode=='unbiased'):
        scale = (len(x)-np.abs(idx[strloc:endloc]))
        ## normed np.sqrt(np.dot(x, x) * np.dot(y, y)) # this is the transformation function
        c = np.true_divide(c[strloc:endloc],scale)
        lags = idx[strloc:endloc]
    return c, lags

def findingLeastDampedMode(A,xvec):
    D, U = np.linalg.eig(A)
    V = np.linalg.inv(U).T
    print(np.real(D))
    ind = np.where(np.abs(np.real(D))==np.min(np.abs(np.real(D))))[0] ## np.abs(np.real(D)) smallest
    DD = D[ind[0]] ## sort eigenvalue in desending way so that the first one will have the lowest decaying rate
    VV = V[:,ind[0]] ## the time series of the least damped mode
    UU = U[:,ind[0]]
    alpha = np.dot(VV.T, xvec)
    Xtr = np.real(np.outer(UU, alpha))
    return Xtr

def ForceSMIP_ColorLIM_trend(T, xvec, Gam):
## Input variables: T, xvec, Gam 
#### T[ntime]: vector of time 
#### xvec[n, ntime]: vector of n*ntime;
#### Gam[1]: noise's dependence on time
    n = xvec.shape
    dt = T[1] - T[0]
    C_Num = np.dot(xvec,xvec.T) / n[1] ## Numerical solution of covariance matrix
    ### numerical derivatives of correlation function
    t_corr = 1 ## compute the correlation function over the domain [-t_corr, t_corr]
    Numerical_dc_at_0 = np.empty((n[0],n[0])) * np.nan
    Numerical_ddc_at_0 = np.empty((n[0],n[0])) * np.nan
    Numerical_dddc_at_0 = np.empty((n[0],n[0])) * np.nan
    for j in range(n[0]):
        for k in range(n[0]):
            [c,lags] = xcorr(xvec[j,:],xvec[k,:],maxlags=round(t_corr/dt),mode='unbiased');
            idx = int((len(lags)-1)/2)
            Numerical_dc_at_0[j,k] = np.dot([-3/2, 2, -1/2], c[idx:(idx+2)+1])/dt; # Forward - Second order
            Numerical_ddc_at_0[j,k] = np.dot([2, -5, 4, -1],c[idx:(idx+3)+1])/dt**2; # Forward - Second order
            Numerical_dddc_at_0[j,k] = np.dot([1/8, -1, 13/8, 0, -13/8, 1, -1/8],\
                                              c[(idx-3):(idx+3)+1])/dt**3; # Central - Fourth order
    N0 = C_Num;
    N1 = Numerical_dc_at_0;
    N2 = Numerical_ddc_at_0;
    N3 = Numerical_dddc_at_0;
    
    X = 0.5 * (N1+N0/Gam);
    Z = Symmetrization(-N2);
    Y = Symmetrization(0.5 * (N2 - N0/Gam**2))
    W = AntiSymmetrization(1/Gam**2 * N1 - N3)

    A_vec_1 = np.concatenate((np.kron(X.T,np.eye(n[0]))+np.kron(np.eye(n[0]),X.T)@com_mat(n[0],n[0]),\
             np.kron(Y.T,np.eye(n[0]))-np.kron(np.eye(n[0]),Y)@com_mat(n[0],n[0])),axis = 0)
    A_vec_2 = np.concatenate((-mat2vec(Z),-mat2vec(W)),axis = 0)
    A_Num = vec2mat(np.linalg.lstsq(A_vec_1,A_vec_2)[0], \
                    int(n[0]), int(n[0]))

    X_least_damped = findingLeastDampedMode(A_Num,xvec)
    
    return X_least_damped

def ForceSMIP_MCA(A, B):
    # MCA analysis of the climate field, note that the climate field has to be in 2D format [time, space]
    # output: a dictionary, containing MCA_A [mode, space] and MCA_B in [space, mode], PC in [time, mode], lamb is the portion of explained variance by each PC
    
    normA = np.sqrt(np.mean(np.mean(A**2, axis=0)))
    A = A/normA
    normB = np.sqrt(np.mean(np.mean(B**2, axis=0)))
    B = B/normB
    U, S, Vh = np.linalg.svd(np.dot(B.T,A), full_matrices=False)

    s = dict()
    s["MCA_Pattern_A"] = Vh
    s["MCA_Pattern_B"] = U.T
    s["MCA_A"] = np.dot(A, Vh.T)
    s["MCA_B"] = np.dot(B, U)
    D = S ** 2
    s["D"] = D / np.sum(D)
    s["normA"] = normA
    s["normB"] = normB
    return s

def ForceSMIP_EOF(A):
    # Adapted form Tongtong Xu's function [adding the standardization for PCs/EOFs with the square root of eigen values]
    # EOF analysis of the climate field, note that the climate field has to be in 2D format [time, space]
    # output: a dictionary, containing EOF in [mode, space], PC in [time, mode], lamb is the portion of explained variance by each PC
    
    norm = np.sqrt(np.mean(np.mean(A**2, axis=0)))
    A = A/norm
    
    U, S, V = np.linalg.svd(A, full_matrices=False)

    s = dict()
    s["EOF"] = np.dot(np.diag(1/S),V)
    s["PC"] = np.dot(A, s["EOF"].T)
    s["S"] = S
    s["norm"] = norm
    return s

def ForceSMIP_read_zmta(fname,varname):
    # this function reads in the data from netcdf
    # input: fname is a string containing (the path and) the name of the ncfile
    #        varname is a string containing the name of the climate variable to be read, e.g., ""
    # output: a dictionary, containing the 3D climate field, lon & lat in 2D, and time
    
    data = xr.open_dataset(fname)
    
    s = dict()
    s["lat"],s["plev"] = np.meshgrid(data["lat"],data["plev"])
    s[varname] = data[varname].values
    s["mask"] = 1 - 0*s[varname].sum(axis=0)
    s["time"] = data["time"]
    s["plev_axis"] = data["plev"]
    s["lat_axis"] = data["lat"]
    data.close()
    return s

# -------------------------------------------------------------------------
# These functions are copied from ForceSMIP_LIM (generated by Tongtong Xu)
# -------------------------------------------------------------------------
def ForceSMIP_read(fname,varname):
    # this function reads in the data from netcdf
    # input: fname is a string containing (the path and) the name of the ncfile
    #        varname is a string containing the name of the climate variable to be read, e.g., ""
    # output: a dictionary, containing the 3D climate field, lon & lat in 2D, and time
    
    data = xr.open_dataset(fname)
    
    s = dict()
    s["lon"],s["lat"] = np.meshgrid(data["lon"],data["lat"])
    s[varname] = data[varname].values
    s["mask"] = 1 - 0*s[varname].sum(axis=0)
    s["time"] = data["time"]
    s["lon_axis"] = data["lon"]
    s["lat_axis"] = data["lat"]
    data.close()
    return s

def ForceSMIP_XYT_into_ZT(z,mask):
    # convert 3D array into 2D, such that the climate field is represented in [time, space]
    # input: z is the 3D array of [time, lat, lon], mask is the 2D array of [lat,lon], containing either 1 or nan
    
    T, J, I = z.shape
    z = np.multiply(z,np.tile(mask[np.newaxis,:,:], (T, 1, 1)))
    z = z.reshape(T,J*I)
    idx = np.where(~np.isnan(z[0, :]))[0]
    zout = z[:,idx]
    return zout

def ForceSMIP_ZT_into_XYT(z,mask):
    # convert 2D array back to 3D, such that the climate field is represented in [time, lat, lon]
    # input: z is the 2D array of [time, space], mask is the 2D array of [lat,lon], containing either 1 or nan
    if len(z.shape)==1:
        J, I = mask.shape
        idx = np.where(~np.isnan(mask.reshape(J*I)))[0]
        zout = np.zeros((1,J*I))
        zout[0,idx] = z
        zout = np.multiply(zout[0,:].reshape(J,I),mask)
    else:
        J, I = mask.shape
        T, _ = z.shape
        idx = np.where(~np.isnan(mask.reshape(J*I)))[0]
        zout = np.zeros((T,J*I))
        zout[:,idx] = z
        zout = np.multiply(zout.reshape(T,J,I),np.tile(mask[np.newaxis,:,:], (T, 1, 1)))
    return zout

def ForceSMIP_removeSeas(z,time):
    # this function removes the seasonal mean climatology
    # input: z is the 3D climate field [time, lat, lon], time is the time series (xarray format)
    # output: seasonal mean fields (seas in [season, lat, lon]), and anomalies in the same size as z
    
    monlist = time.dt.month.values
    T, J, I = z.shape

    # get seasonality matrices
    seas = np.zeros((12, J, I))
    for i in range(1, 13):
        loc = monlist == i
        seas[i-1, :, :] = np.mean(z[loc, :, :], axis=0)

    # get anomaly after removing seasonality
    ano = np.zeros_like(z)
    for i in range(1, 13):
        loc = monlist == i
        ano[loc, :, :] = z[loc, :, :] - seas[i-1, :, :]
    return seas,ano
    
def ForceSMIP_lim_trend(X,tau0):
    # derive the LIM based trend
    # input: X is 2D field in [mode, time], tau0 is the training lag
    # output: u, the spatial pattern of the least damped mode; alpha, the time series associated with the least damped mode;
    #         X_least_damped is in [mode, time], the trend representation in PC space

    X0 = X[:,:-tau0]
    Xtau = X[:,tau0:]

    C0 = np.dot(X0, X0.T) / (X0.shape[1] - 1)
    Ctau = np.dot(Xtau, X0.T) / (X0.shape[1] - 1)

    # linear operator
    G = np.dot(Ctau,np.linalg.pinv(C0))
    D,U = np.linalg.eig(G)
    D = np.log(D)/tau0
    V = np.linalg.inv(U).T

    # sort modes
    loc = np.argsort(-np.real(D))
    sigma = -1 / np.real(D[loc])
    UU = U[:, loc]
    VV = V[:, loc]

    # identify the least damped mode
    u = UU[:, 0]
    v = VV[:, 0]
    alpha = np.dot(v, X)

    X_least_damped = np.real(np.outer(u, alpha))
    return X_least_damped


# -------------------------------------------------------------------------
# These functions are copied from ForceSMIP_LFCA (generated by Robert Wills)
# -------------------------------------------------------------------------
from scipy.signal import convolve, butter, filtfilt
### Helper functions ###
def low_pass_weights(window, cutoff):
    """Calculate weights for a low pass Lanczos filter.

    Args:

    window: int
        The length of the filter window.

    cutoff: float
        The cutoff frequency in inverse time steps.
        
    References
    ----------
        from https://scitools.org.uk/iris/docs/v1.2/examples/graphics/SOI_filtering.html

    """
    order = ((window - 1) // 2 ) + 1
    nwts = 2 * order + 1
    w = np.zeros([nwts])
    n = nwts // 2
    w[n] = 2 * cutoff
    k = np.arange(1., n)
    sigma = np.sin(np.pi * k / n) * n / (np.pi * k)
    firstfactor = np.sin(2. * np.pi * cutoff * k) / (np.pi * k)
    w[n-1:0:-1] = firstfactor * sigma
    w[n+1:-1] = firstfactor * sigma
    return w[1:-1]

def filter_padding(ts, window, ftype='mirror', detrend=True, detrend_poly=1): 
    ts_pad = np.zeros(2*window+len(ts))
    if detrend:
        t = np.arange(len(ts))
        z = np.polyfit(t,ts,detrend_poly)
        p = np.poly1d(z)
        ts_in = ts-p(t)
    else:
        ts_in = ts
    ts_pad[window:-window] = ts_in[:]
    if ftype == 'mirror':
        ts_pad[:window] = ts_in[:window][::-1]
        ts_pad[-window:] = ts_in[-window:][::-1]
    elif ftype == 'periodic':
        ts_pad[:window] = ts_in[-window:]
        ts_pad[-window:] = ts_in[:window]
    else:
        raise ValueError('in filter_padding: ftype must be one of "mirror" or "periodic".')
    if detrend:
        t_pad = np.arange(-window,len(ts)+window)
        ts_pad = ts_pad+p(t_pad)
    return ts_pad

def filter_ts(ts, cutoff, filter_type='lanczos', padding_type='mirror', detrend=True, detrend_poly=1): 
    lanczos_weights=low_pass_weights(cutoff*2+1,1./cutoff) # weights for Lanczos filter
    n_pad=int(np.ceil(len(lanczos_weights)/2))
    
    # Padding
    ts_mirr=filter_padding(ts,n_pad,padding_type,detrend=detrend,detrend_poly=detrend_poly)
    
    # Filtering
    if filter_type=='lanczos':
        # Lanczos filter
        return convolve(ts_mirr,lanczos_weights,'same')[n_pad:-n_pad]
    elif filter_type=='butter':
        # Butterworth filter
        # TODO: here, the cutoff frequency needs to be doubled
        # to obtain the same result as for Lanczos - why?
        b,a = butter(3,1./(cutoff/2),btype='low')
        return filtfilt(b,a,ts_mirr)[n_pad:-n_pad]
    else:
        raise ValueError('in filter_ts: filter_type must be one of "lanczos" or "butter".')

In [5]:
def lintrd(input_x,input_y):
        shapey = len(input_y.shape)
        if (shapey==1):
                trd = np.empty((1,)) * np.nan
                trd = np.polyfit(input_x,input_y,1)[0]
                pred = np.polyval(np.polyfit(input_x,input_y,1),input_x)
                residual = np.empty((len(input_x),))
                residual = input_y[:]-pred[:]
                var_residual = (1/(len(input_x)-1)*np.nansum(residual*residual))
                var_x = (1/(len(input_x)-1)*np.nansum((input_x-np.nanmean(input_x))*(input_x-np.nanmean(input_x))))
                trd_std = np.sqrt((var_residual/((len(input_x)-1)*var_x)))
        elif (shapey==2):
                trd = np.empty((input_y.shape[1],)) * np.nan
                trd_std = np.empty((input_y.shape[1],)) * np.nan
                for i in range(len(input_y[0])):
                        trd[i] = np.polyfit(input_x,input_y[:,i],1)[0]
                        pred = np.polyval(np.polyfit(input_x,input_y[:,i],1),input_x)
                        residual = input_y[:,i]-pred
                        var_residual = (1/(len(input_x)-1)*np.nansum(residual*residual))
                        var_x = (1/(len(input_x)-1)*np.nansum((input_x-np.nanmean(input_x))*(input_x-np.nanmean(input_x))))
                        trd_std[i] = np.sqrt((var_residual/((len(input_x)-1)*var_x)))
        elif (shapey==3):
                trd = np.empty((input_y.shape[1],input_y.shape[2])) * np.nan
                trd_std = np.empty((input_y.shape[1],input_y.shape[2])) * np.nan
                for i in range(len(trd)):
                        for j in range(len(trd[0])):
                                if (np.isnan(input_y[0,i,j])==False):
                                    trd[i,j] = np.polyfit(input_x,input_y[:,i,j],1)[0]
                                    pred = np.polyval(np.polyfit(input_x,input_y[:,i,j],1),input_x)
                                    residual = input_y[:,i,j]-pred
                                    var_residual = (1/(len(input_x)-1)*np.nansum(residual*residual))
                                    var_x = (1/(len(input_x)-1)*np.nansum((input_x-np.nanmean(input_x))*(input_x-np.nanmean(input_x))))
                                    trd_std[i,j] = np.sqrt((var_residual/((len(input_x)-1)*var_x)))
        return trd, trd_std