In [1]:
import dask

import numpy as np
import pandas as pd
import xarray as xr
from tqdm.contrib import itertools
import sys, os, math

# sys.path.insert(0, '../src')
# from regression import *


########################################################################################################################
# ludcmp, lubksb, and linearsolver

# Source: https://phyweb.physics.nus.edu.sg/~phywjs/CZ5101/ludcmp.py
# Similar to Fortran GMET solution

# "Numerical Recipes" ludcmp() C code on page 46 translated into Python
# I make it as simple as possible, disregard efficiency.
# Here a is a list of list, n is integer (size of the matrix)
# index is a list, and d is also a list of size 1
# Python list index starts from 0.  So matrix index is from 0 to n-1.
#
import math
import time

def ludcmp(a, n, indx, d):
    d[0] = 1.0
    # looking for the largest a in each row and store it in vv as inverse
    # We need a new list same size as indx, for this we use .copy()
    vv = indx.copy()
    for i in range(0, n):
        big = 0.0
        for j in range(0, n):
            temp = math.fabs(a[i][j])
            if (temp > big):
                big = temp
        vv[i] = 1.0 / big
    #
    # run Crout's algorithm
    for j in range(0, n):
        # top half & bottom part are combined
        # but the upper limit l for k sum is different
        big = 0.0
        for i in range(0, n):
            if (i < j):
                l = i
            else:
                l = j
            sum = a[i][j]
            for k in range(0, l):
                sum -= a[i][k] * a[k][j]
            # end for k
            a[i][j] = sum
            # for bottom half, we keep track which row is larger
            if (i >= j):
                dum = vv[i] * math.fabs(sum)
                if (dum >= big):
                    big = dum
                    imax = i
            # end if (i>= ...)
        # end for i
        # pivoting part, swap row j with row imax, a[j] is a whole row
        if (j != imax):
            dum = a[imax]
            a[imax] = a[j]
            a[j] = dum
            d[0] = - d[0]
            vv[imax] = vv[j]
        # end if (j != ...)
        # divide by the beta diagonal value
        indx[j] = imax
        dum = 1.0 / a[j][j]
        for i in range(j + 1, n):
            a[i][j] *= dum
        # end for i
    # end for j


# end of def ludcmp

# We do backward substitution in lubksb() take the row swapped LU decomposed
# a, size n, and swapping indx, and b vector as input.  The output is
# in b after calling.
def lubksb(a, n, indx, b):
    ii = -1
    # forward
    for i in range(0, n):
        ip = indx[i]
        sum = b[ip]
        b[ip] = b[i]
        if (ii != -1):
            for j in range(ii, i):
                sum -= a[i][j] * b[j]
        elif (sum != 0):
            ii = i
        b[i] = sum
    # bote alpha_{ii} is 1 above
    #  backward
    for i in range(n - 1, -1, -1):
        sum = b[i]
        for j in range(i + 1, n):
            sum -= a[i][j] * b[j]
        b[i] = sum / a[i][i]


# end lubksb()


# unfortunately a is destroyed (become swapped LU)
def linearsolver(a, n, b):
    indx = list(range(n))
    d = [1]
    ludcmp(a, n, indx, d)
    x = b.copy()
    lubksb(a, n, indx, x)
    # print("x=", x)
    return x

# if __name__ == "__main__":
#     # using ludcmp and lubksb
#     t1 = time.time()
#     for i in range(10000):
#         a = [[1, 3, 3, -5], [2, -4, 7, -1], [7, 0.5, 3, -6], [9, -2, 3, 8]]
#         n = 4
#         b = [0, 2, 3, -10]
#         x2 = linearsolver(a, n, b)
#     print(x2)
#     t2 = time.time()
#     print(t2-t1)
#
#     # just using numpy
#     from numpy.linalg import inv
#     import numpy as np
#     t1 = time.time()
#     for i in range(10000):
#         a = np.array([[1, 3, 3, -5], [2, -4, 7, -1], [7, 0.5, 3, -6], [9, -2, 3, 8]])
#         b = np.array([0, 2, 3, -10])
#         ainv = inv(a)
#         x2 = np.matmul(ainv, b)
#     print(x2)
#     t2 = time.time()
#     print(t2-t1)


########################################################################################################################
# basic regression utility functions

def least_squares_numpy(x, y, tx):
    # In fortran version, ludcmp and lubksb are used to calcualte matrix inversion
    # call ludcmp(a, indx, d)
    # call lubksb(a, indx, b)

    # In Python version, numpy is used to calculate matrix inversion
    c = np.matmul(tx, y)
    a = np.matmul(tx, x)

    n = np.shape(a)[0]
    b = np.zeros(n)

    deta = np.linalg.det(a)  # Compute the determinant of an array
    if deta == 0:
        # print('Singular matrix')
        b[:] = 0
    else:
        ainv = np.linalg.inv(a)
        b = np.matmul(ainv, c)

    return b


def least_squares_ludcmp(x, y, tx):
    # In fortran version, ludcmp and lubksb are used to calcualte matrix inversion
    # call ludcmp(a, indx, d)
    # call lubksb(a, indx, b)

    # In Python version, numpy is used to calculate matrix inversion
    c = np.matmul(tx, y)
    a = np.matmul(tx, x)
    n = np.shape(a)[0]
    b = linearsolver(list(a), n, list(c))

    return b

def logistic_regression(x, tx, yp):
    # nstn: station number
    # nvars: station attributes (1, lat/lon/...), 1 is for regression
    nstn, nvars = np.shape(x)

    b = np.zeros(nvars)  # regression coefficients (beta)
    p = np.zeros(nstn)  # estimated pop (probability of precipitation)
    f = 0  # flag: continue or stop loops
    it = 0  # iteration times

    while f != 1:
        # check for divergence
        xb = -np.matmul(x, b)
        if np.any(xb > 50):
            f = 1
        else:
            p = 1 / (1 + np.exp(xb))

        # check for divergence
        if np.any(p > 0.9999):
            # logistic regression diverging
            f = 1
        else:
            v = np.zeros([nstn, nstn])  # diagonal variance matrix
            for i in range(nstn):
                v[i, i] = p[i] * (1 - p[i])
            xv = np.matmul(v, x)
            yn = yp - p  # difference between station occurrence (0/1) and estimated pop: Bnew -Bold in Martyn and Slater 2006
            bn = least_squares_ludcmp(xv, yn, tx)

            # check: converging
            if np.any(np.abs(bn) > 1e-4):
                f = 0
            else:
                f = 1

            # check: iteration times
            if it > 20:
                f = 1
            else:
                f = 0

            b = b + bn  # update coefficients

        it = it + 1

    return b


def weight_linear_regression(nearinfo, weightnear, datanear, tarinfo):
    # # nearinfo: predictors from neighboring stations
    # # [station number, predictor number + 1] array with the first column being ones
    # nearinfo = np.zeros([nnum, npred+1])
    #
    # # weightnear: weight of neighboring stations
    # # [station number, station number] array with weights located in the diagonal
    # weightnear = np.zeros([nnum, nnum])
    # for i in range(nnum):
    #     weightnear[i, i] = 123
    #
    # # tarinfo:  predictors from target stations
    # # [predictor number + 1] vector with the first value being one
    # tarinfo = np.zeros(npred+1)
    #
    # # datanear: data from neighboring stations. [station number] vector
    # datanear = np.zeros(nnum)

    # start regression
    w_pcp_red = np.diag(np.squeeze(weightnear))
    tx_red = np.transpose(nearinfo)
    twx_red = np.matmul(tx_red, w_pcp_red)
    b = least_squares_ludcmp(nearinfo, datanear, twx_red)
    datatar = np.dot(tarinfo, b)

    return datatar

def weight_logistic_regression(nearinfo, weightnear, datanear, tarinfo):

    w_pcp_red = np.diag(np.squeeze(weightnear))

    # if len(np.unique(datanear)) == 1:
    #     pop = datanear[0] # all 0 (no rain) or all 1 (rain everywhere)
    # else:
    tx_red = np.transpose(nearinfo)
    twx_red = np.matmul(tx_red, w_pcp_red)
    b = logistic_regression(nearinfo, twx_red, datanear)
    if np.all(b == 0) or np.any(np.isnan(b)):
        pop = np.dot(weightnear, datanear)
    else:
        zb = - np.dot(tarinfo, b)
        pop = 1 / (1 + np.exp(zb))

    return pop

def check_predictor_matrix_behavior(nearinfo, weightnear):
    # check to see if the station predictor matrix will be well behaved
    w_pcp_red = np.diag(np.squeeze(weightnear))
    tx_red = np.transpose(nearinfo)
    twx_red = np.matmul(tx_red, w_pcp_red)
    tmp = np.matmul(twx_red, nearinfo)
    vv =  np.max(np.abs(tmp), axis=1)
    if np.any(vv==0):
        flag = False # bad performance
    else:
        flag = True # good performance
    return flag


###########################
# regression using Python sklearn package: easier to use, but slower and a bit different from the above functions
from sklearn.linear_model import LinearRegression
from sklearn.linear_model import LogisticRegression
def sklearn_weight_linear_regression(nearinfo, weightnear, datanear, tarinfo):
    model = LinearRegression()
    model = model.fit(nearinfo, datanear, sample_weight=weightnear)
    datatar = model.predict(tarinfo[np.newaxis, :])
    return datatar
def sklearn_weight_logistic_regression(nearinfo, weightnear, datanear, tarinfo):

    model = LogisticRegression(solver='liblinear')
    model = model.fit(nearinfo, datanear, sample_weight=weightnear)
    pop = model.predict_proba(tarinfo[np.newaxis,:])[0, 1]
    return pop


########################################################################################################################
# dynmaic predictor-related functions
def initial_check_dynamic_predictor(dynamic_predictor_name, dynamic_predictor_filelist, target_vars):

    if not isinstance(dynamic_predictor_name, list):
        sys.exit(f'Error! dynamic_predictor_name must be a list.')

    if (len(target_vars) != len(dynamic_predictor_name)) and (len(dynamic_predictor_name) > 0):
        sys.exit(f'Error! len(dynamic_predictor_name)>0 but is different from len(target_vars)!')

    flag = False
    if not os.path.isfile(dynamic_predictor_filelist):
        print('Do not find dynamic_predictor_filelist:', dynamic_predictor_filelist)
    elif len(dynamic_predictor_name) == 0:
        print('dynamic_predictor_name length is 0')
    else:
        with open(dynamic_predictor_filelist, 'r') as f:
            file0 = f.readline().strip()
        if not os.path.isfile(file0):
            print(f'Do not find the first file: {file0} in dynamic_predictor_filelist: {dynamic_predictor_filelist}')
        else:
            # with nc.Dataset(file0) as ds:
            with xr.open_dataset(file0) as ds:
                for i in range(len(dynamic_predictor_name)):
                    tmp = []
                    for v in dynamic_predictor_name[i]:
                        # if v in ds.variables.keys():
                        if v in ds.data_vars:
                            print(f'Include {v} as a dynamic predictor for {target_vars[i]}')
                            tmp.append(v)
                        else:
                            print(f'Cannot find {v} in {file0}. Do not include it as a dynamic predictor for {target_vars[i]}')
                    if len(tmp) > 0:
                        flag = True
                    dynamic_predictor_name[i] = tmp

    if flag == False:
        print('Dynamic predictor is not activated')
    else:
        print(f'Dynamic predictor is activated. Dynamic predicotrs are {dynamic_predictor_name}')
    return flag

def map_filelist_timestep(dynamic_predictor_filelist, timeseries):
    # for every time step, find the corresponding input file

    # get file list
    filelist = []
    with open(dynamic_predictor_filelist, 'r') as f:
        for line in f:
            filelist.append(line.strip())

    # make a dateframe containing all time steps
    df = pd.DataFrame()
    for i in range(len(filelist)):
        with xr.open_dataset(filelist[i]) as ds:
            timei = ds.time.values
            files = [filelist[i]] * len(timei)
            dfi = pd.DataFrame({'intime': timei, 'file': files, 'ind': np.arange(len(timei))})
            df = pd.concat((df, dfi))

    # for each time step, find the closest
    df_mapping = pd.DataFrame()
    for t in timeseries:
        if t >= df['intime'].iloc[0] and t <= df['intime'].iloc[-1]: # don't extrapolate 
            indi = np.argmin(np.abs(df['intime'] - t)) # nearest neighbor
            df_mapping = pd.concat((df_mapping, df.iloc[[indi]]))
        else:
            df_mapping = pd.concat((df_mapping, pd.DataFrame({'intime': [np.nan], 'file': [''], 'ind': [np.nan]})))

    df_mapping['tartime'] = timeseries
    df_mapping.index = np.arange(len(df_mapping))

    return df_mapping

# def read_timestep_input_data(df_mapping, tarindex, varnames):
#     # not finished
#     df = df_mapping.iloc[tarindex]
#     with xr.open_dataset(df['file']) as ds:
#         ds = ds.isel(time=tarindex)
#         ds = ds[varnames]
#         ds = ds.load()
#     return ds


def read_period_input_data(df_mapping, varnames):
    # read and
    # select period
    # select variables

    # tarlon/tarlat: target lat/lon vector
    # default: dynmaic input files must have lat/lon dimensions

    files = np.unique(df_mapping['file'].values)
    files = [f for f in files if len(f)>0]

    intime = df_mapping['intime'].values
    intime = intime[~np.isnan(intime)]

    tartime = df_mapping['tartime'].values

    if len(files) == 0:
        print('Warning! Cannot find any valid dynamic input files')
        ds = xr.Dataset()
    else:
        ds = xr.open_mfdataset(files)
        if np.any([not d in ds.dims for d in ['time', 'lat','lon']]):
            sys.exit(f'Dynamic input files must have lat, lon, and time dimensions!')
        # select ...
        ds = ds[varnames]
        ds = ds.sel(time=slice(tartime[0], tartime[-1]))
        ds = ds.load()
        ds = ds.interp(time=tartime, method='nearest')
    return ds


def regrid_xarray(ds, tarlon, tarlat, target):
    # if target='1D', tarlon and tarlat are vector of station points
    # if target='2D', tarlon and tarlat are vector defining grids

    # regridding: space and time
    if target == '2D':

        ds = ds.transpose('time', 'lat', 'lon')

        if ds.lat.values[0] > ds.lat.values[1]:
            ds = ds.sel(lat=slice(tarlat[-1], tarlat[0]))
        else:
            ds = ds.sel(lat=slice(tarlat[0], tarlat[-1]))
        if ds.lon.values[0] > ds.lon.values[1]:
            ds = ds.sel(lon=slice(tarlon[-1], tarlon[0]))
        else:
            ds = ds.sel(lon=slice(tarlon[0], tarlon[-1]))

        ds_out = ds.interp(lat=tarlat, lon=tarlon, method='linear')

    elif target == '1D':
        tarlat = xr.DataArray(tarlat, dims=('z'))
        tarlon = xr.DataArray(tarlon, dims=('z'))
        ds_out = ds.interp(lat=tarlat, lon=tarlon, method='linear')
        ds_out = ds_out.transpose('time', 'z')

    else:
        sys.exit('Unknown target')

    return ds_out



def flatten_list(lst):
    flat_list = []
    for item in lst:
        if isinstance(item, list):
            flat_list.extend(flatten_list(item))
        else:
            flat_list.append(item)
    return flat_list

########################################################################################################################
# wrapped up regression functions

def loop_regression_2Dor3D(stn_data, stn_predictor, tar_nearIndex, tar_nearWeight, tar_predictor, method, dynamic_predictors={}):
    # regression for 2D (vector) or 3D (array) inputs
    # 2D:
    # stn_data: [input station, time steps]
    # stn_predictor: [input station, number of predictors]
    # nearIndex/nearWeight: [target point, number of nearby stations]
    # tar_predictor: [target point, number of predictors]
    # 3D:
    # stn_data: [input station, time steps]
    # stn_predictor: [input station, number of predictors]
    # nearIndex/nearWeight: [row, col, number of nearby stations]
    # tar_predictor: [row, col, number of predictors]

    np.savez_compressed('../docs/data_for_parallel_test.npz', stn_data=stn_data, stn_predictor=stn_predictor,
                        tar_nearIndex=tar_nearIndex, tar_nearWeight=tar_nearWeight, tar_predictor=tar_predictor, method=method, dynamic_predictors=dynamic_predictors)

    if len(dynamic_predictors) == 0:
        dynamic_predictors['flag'] = False

    # if tar_nearIndex.ndim == 2:
    #     # make it a 3D array to be consistent
    #     tar_nearIndex = tar_nearIndex[np.newaxis, :, :]
    #     tar_nearWeight = tar_nearWeight[np.newaxis, :, :]
    #     tar_predictor = tar_predictor[np.newaxis, :, :]
    #
    #     if dynamic_predictors['flag'] == True:
    #         # change raw dim: [n_feature, n_time, n_station] to [n_feature, n_time, 1, n_station]
    #         dynamic_predictors['tar_predictor_dynamic'] = dynamic_predictors['tar_predictor_dynamic'][:, :, np.newaxis, :]


    nstn, ntime = np.shape(stn_data)
    nrow, ncol, nearmax = np.shape(tar_nearIndex)
    estimates = np.nan * np.zeros([nrow, ncol, ntime], dtype=np.float32)

    for r, c in itertools.product(range(nrow), range(ncol)):

        # prepare xdata and sample weight for training and weights of neighboring stations
        sample_nearIndex = tar_nearIndex[r, c, :]
        index_valid = sample_nearIndex >= 0

        if np.sum(index_valid) > 0:
            sample_nearIndex = sample_nearIndex[index_valid]

            sample_weight = tar_nearWeight[r, c, :][index_valid]
            sample_weight = sample_weight / np.sum(sample_weight)

            xdata_near0 = stn_predictor[sample_nearIndex, :]
            xdata_g0 = tar_predictor[r, c, :]

            # interpolation for every time step
            for d in range(ntime):

                ydata_near = np.squeeze(stn_data[sample_nearIndex, d])
                if len(np.unique(ydata_near)) == 1:  # e.g., for prcp, all zero
                    ydata_tar = ydata_near[0]
                else:

                    # add dynamic predictors if flag is true and predictors are good
                    xdata_near = xdata_near0
                    xdata_g = xdata_g0
                    if dynamic_predictors['flag'] == True:
                        xdata_near_add = dynamic_predictors['stn_predictor_dynamic'][:, d, sample_nearIndex].T
                        xdata_g_add = dynamic_predictors['tar_predictor_dynamic'][:, d, r, c]
                        if np.all(~np.isnan(xdata_near_add)) and np.all(~np.isnan(xdata_g_add)):
                            xdata_near_try = np.hstack((xdata_near, xdata_near_add))
                            xdata_g_try = np.hstack((xdata_g, xdata_g_add))
                            # check if dynamic predictors are good for regression
                            if check_predictor_matrix_behavior(xdata_near_try, sample_weight) == True:
                                xdata_near = xdata_near_try
                                xdata_g = xdata_g_try
                            else:
                                xdata_near_try = np.hstack((xdata_near, xdata_near_add[:, ~dynamic_predictors['predictor_checkflag']]))
                                xdata_g_try = np.hstack((xdata_g, xdata_g_add[~dynamic_predictors['predictor_checkflag']]))
                                if check_predictor_matrix_behavior(xdata_near_try, sample_weight) == True:
                                    xdata_near = xdata_near_try
                                    xdata_g = xdata_g_try

                    # regression
                    if method == 'linear':
                        ydata_tar = weight_linear_regression(xdata_near, sample_weight, ydata_near, xdata_g)
                    elif method == 'logistic':
                        ydata_tar = weight_logistic_regression(xdata_near, sample_weight, ydata_near, xdata_g)
                    else:
                        sys.exit(f'Unknonwn regression method: {method}')
                estimates[r, c, d] = ydata_tar
    return np.squeeze(estimates)

In [2]:
# load data
data0 = np.load('data_for_parallel_test.npz', allow_pickle=True)
stn_data=data0['stn_data']
stn_predictor=data0['stn_predictor']
tar_nearIndex=data0['tar_nearIndex']
tar_nearWeight=data0['tar_nearWeight']
tar_predictor=data0['tar_predictor']
method=data0['method']
dynamic_predictors=data0['dynamic_predictors'].item()

In [5]:
%%time

def weight_linear_regression(nearinfo, weightnear, datanear, tarinfo):
    # # nearinfo: predictors from neighboring stations
    # # [station number, predictor number + 1] array with the first column being ones
    # nearinfo = np.zeros([nnum, npred+1])
    #
    # # weightnear: weight of neighboring stations
    # # [station number, station number] array with weights located in the diagonal
    # weightnear = np.zeros([nnum, nnum])
    # for i in range(nnum):
    #     weightnear[i, i] = 123
    #
    # # tarinfo:  predictors from target stations
    # # [predictor number + 1] vector with the first value being one
    # tarinfo = np.zeros(npred+1)
    #
    # # datanear: data from neighboring stations. [station number] vector
    # datanear = np.zeros(nnum)

    # start regression
    w_pcp_red = np.diag(np.squeeze(weightnear))
    tx_red = np.transpose(nearinfo)
    twx_red = np.matmul(tx_red, w_pcp_red)
    b = least_squares_ludcmp(nearinfo, datanear, twx_red)
    datatar = np.dot(tarinfo, b)

    return datatar

# loop_regression_2Dor3D function


if len(dynamic_predictors) == 0:
    dynamic_predictors['flag'] = False

# if tar_nearIndex.ndim == 2:
#     # make it a 3D array to be consistent
#     tar_nearIndex = tar_nearIndex[np.newaxis, :, :]
#     tar_nearWeight = tar_nearWeight[np.newaxis, :, :]
#     tar_predictor = tar_predictor[np.newaxis, :, :]
#
#     if dynamic_predictors['flag'] == True:
#         # change raw dim: [n_feature, n_time, n_station] to [n_feature, n_time, 1, n_station]
#         dynamic_predictors['tar_predictor_dynamic'] = dynamic_predictors['tar_predictor_dynamic'][:, :, np.newaxis, :]


nstn, ntime = np.shape(stn_data)
nrow, ncol, nearmax = np.shape(tar_nearIndex)
estimates = np.nan * np.zeros([nrow, ncol, ntime], dtype=np.float32)

for r, c in itertools.product(range(nrow), range(ncol)):

    # prepare xdata and sample weight for training and weights of neighboring stations
    sample_nearIndex = tar_nearIndex[r, c, :]
    index_valid = sample_nearIndex >= 0

    if np.sum(index_valid) > 0:
        sample_nearIndex = sample_nearIndex[index_valid]

        sample_weight = tar_nearWeight[r, c, :][index_valid]
        sample_weight = sample_weight / np.sum(sample_weight)

        xdata_near0 = stn_predictor[sample_nearIndex, :]
        xdata_g0 = tar_predictor[r, c, :]

        # interpolation for every time step
        for d in range(ntime):

            ydata_near = np.squeeze(stn_data[sample_nearIndex, d])
            if len(np.unique(ydata_near)) == 1:  # e.g., for prcp, all zero
                ydata_tar = ydata_near[0]
            else:

                # add dynamic predictors if flag is true and predictors are good
                xdata_near = xdata_near0
                xdata_g = xdata_g0
                if dynamic_predictors['flag'] == True:
                    xdata_near_add = dynamic_predictors['stn_predictor_dynamic'][:, d, sample_nearIndex].T
                    xdata_g_add = dynamic_predictors['tar_predictor_dynamic'][:, d, r, c]
                    if np.all(~np.isnan(xdata_near_add)) and np.all(~np.isnan(xdata_g_add)):
                        xdata_near_try = np.hstack((xdata_near, xdata_near_add))
                        xdata_g_try = np.hstack((xdata_g, xdata_g_add))
                        # check if dynamic predictors are good for regression
                        if check_predictor_matrix_behavior(xdata_near_try, sample_weight) == True:
                            xdata_near = xdata_near_try
                            xdata_g = xdata_g_try
                        else:
                            xdata_near_try = np.hstack((xdata_near, xdata_near_add[:, ~dynamic_predictors['predictor_checkflag']]))
                            xdata_g_try = np.hstack((xdata_g, xdata_g_add[~dynamic_predictors['predictor_checkflag']]))
                            if check_predictor_matrix_behavior(xdata_near_try, sample_weight) == True:
                                xdata_near = xdata_near_try
                                xdata_g = xdata_g_try

                # regression
                if method == 'linear':
                    ydata_tar = weight_linear_regression(xdata_near, sample_weight, ydata_near, xdata_g)
                elif method == 'logistic':
                    ydata_tar = weight_logistic_regression(xdata_near, sample_weight, ydata_near, xdata_g)
                else:
                    sys.exit(f'Unknonwn regression method: {method}')
            estimates[r, c, d] = ydata_tar

  0%|          | 0/16128 [00:00<?, ?it/s]

CPU times: user 27.3 s, sys: 337 ms, total: 27.6 s
Wall time: 27.7 s


In [11]:

def init_worker(stn_data, stn_predictor, tar_nearIndex, tar_nearWeight, tar_predictor, method, dynamic_predictors):
    # Using a dictionary is not strictly necessary. You can also
    # use global variables.
    var_dict['stn_data'] = stn_data
    var_dict['stn_predictor'] = stn_predictor
    var_dict['tar_nearIndex'] = tar_nearIndex
    var_dict['tar_nearWeight'] = tar_nearWeight
    var_dict['tar_predictor'] = tar_predictor
    var_dict['method'] = method
    var_dict['dynamic_predictors'] = dynamic_predictors



def worker_func(r, c):
    stn_data = var_dict['stn_data']
    stn_predictor = var_dict['stn_predictor']
    tar_nearIndex = var_dict['tar_nearIndex']
    tar_nearWeight = var_dict['tar_nearWeight']
    tar_predictor = var_dict['tar_predictor']
    method = var_dict['method']
    dynamic_predictors = var_dict['dynamic_predictors']

    nstn, ntime = np.shape(stn_data)
    nrow, ncol, nearmax = np.shape(tar_nearIndex)

    # prepare xdata and sample weight for training and weights of neighboring stations
    sample_nearIndex = tar_nearIndex[r, c, :]
    index_valid = sample_nearIndex >= 0

    if np.sum(index_valid) > 0:
        sample_nearIndex = sample_nearIndex[index_valid]

        sample_weight = tar_nearWeight[r, c, :][index_valid]
        sample_weight = sample_weight / np.sum(sample_weight)

        xdata_near0 = stn_predictor[sample_nearIndex, :]
        xdata_g0 = tar_predictor[r, c, :]

        # interpolation for every time step
        for d in range(ntime):

            ydata_near = np.squeeze(stn_data[sample_nearIndex, d])
            if len(np.unique(ydata_near)) == 1:  # e.g., for prcp, all zero
                ydata_tar = ydata_near[0]
            else:

                # add dynamic predictors if flag is true and predictors are good
                xdata_near = xdata_near0
                xdata_g = xdata_g0
                if dynamic_predictors['flag'] == True:
                    xdata_near_add = dynamic_predictors['stn_predictor_dynamic'][:, d, sample_nearIndex].T
                    xdata_g_add = dynamic_predictors['tar_predictor_dynamic'][:, d, r, c]
                    if np.all(~np.isnan(xdata_near_add)) and np.all(~np.isnan(xdata_g_add)):
                        xdata_near_try = np.hstack((xdata_near, xdata_near_add))
                        xdata_g_try = np.hstack((xdata_g, xdata_g_add))
                        # check if dynamic predictors are good for regression
                        if check_predictor_matrix_behavior(xdata_near_try, sample_weight) == True:
                            xdata_near = xdata_near_try
                            xdata_g = xdata_g_try
                        else:
                            xdata_near_try = np.hstack(
                                (xdata_near, xdata_near_add[:, ~dynamic_predictors['predictor_checkflag']]))
                            xdata_g_try = np.hstack((xdata_g, xdata_g_add[~dynamic_predictors['predictor_checkflag']]))
                            if check_predictor_matrix_behavior(xdata_near_try, sample_weight) == True:
                                xdata_near = xdata_near_try
                                xdata_g = xdata_g_try

                # regression
                if method == 'linear':
                    ydata_tar = weight_linear_regression(xdata_near, sample_weight, ydata_near, xdata_g)
                elif method == 'logistic':
                    ydata_tar = weight_logistic_regression(xdata_near, sample_weight, ydata_near, xdata_g)
                else:
                    sys.exit(f'Unknonwn regression method: {method}')
                    
                

In [14]:
    %%time

    import multiprocessing
    from multiprocessing import Pool

    # np.savez_compressed('../docs/data_for_parallel_test.npz', stn_data=stn_data, stn_predictor=stn_predictor,
    #                     tar_nearIndex=tar_nearIndex, tar_nearWeight=tar_nearWeight, tar_predictor=tar_predictor, method=method, dynamic_predictors=dynamic_predictors)

    if len(dynamic_predictors) == 0:
        dynamic_predictors['flag'] = False

    # if tar_nearIndex.ndim == 2:
    #     # make it a 3D array to be consistent
    #     tar_nearIndex = tar_nearIndex[np.newaxis, :, :]
    #     tar_nearWeight = tar_nearWeight[np.newaxis, :, :]
    #     tar_predictor = tar_predictor[np.newaxis, :, :]
    #
    #     if dynamic_predictors['flag'] == True:
    #         # change raw dim: [n_feature, n_time, n_station] to [n_feature, n_time, 1, n_station]
    #         dynamic_predictors['tar_predictor_dynamic'] = dynamic_predictors['tar_predictor_dynamic'][:, :, np.newaxis, :]


    nstn, ntime = np.shape(stn_data)
    nrow, ncol, nearmax = np.shape(tar_nearIndex)
    estimates = np.nan * np.zeros([nrow, ncol, ntime], dtype=np.float32)

    items = [(r,c) for r, c in itertools.product(range(nrow), range(ncol))]
    with Pool(processes=5, initializer=init_worker, initargs=(stn_data, stn_predictor, tar_nearIndex, tar_nearWeight, tar_predictor, method, dynamic_predictors)) as pool:
        result = pool.starmap(worker_func, items)
        

  0%|          | 0/16128 [00:00<?, ?it/s]

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/opt/anaconda3/envs/general/lib/python3.9/multiprocessing/spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "/opt/anaconda3/envs/general/lib/python3.9/multiprocessing/spawn.py", line 126, in _main
    self = reduction.pickle.load(from_parent)
AttributeError: Can't get attribute 'init_worker' on <module '__main__' (built-in)>


KeyboardInterrupt: 

In [4]:
from dask.distributed import Client

# without specifying unique thread, the function is executed
# on all threads
client = Client(n_workers=6, threads_per_worker=1)
client.close()
# loop_regression_2Dor3D function

In [7]:
%%time

# loop_regression_2Dor3D function
@dask.delayed
def reg_fun(stn_data, stn_predictor, tar_nearIndex, tar_nearWeight, tar_predictor, method, dynamic_predictors, r, c):
    # prepare xdata and sample weight for training and weights of neighboring stations
    nstn, ntime = np.shape(stn_data)
    nrow, ncol, nearmax = np.shape(tar_nearIndex)
    sample_nearIndex = tar_nearIndex[r, c, :]
    index_valid = sample_nearIndex >= 0
    ydata_tar_all = np.nan * np.zeros(ntime)
    if np.sum(index_valid) > 0:
        sample_nearIndex = sample_nearIndex[index_valid]

        sample_weight = tar_nearWeight[r, c, :][index_valid]
        sample_weight = sample_weight / np.sum(sample_weight)

        xdata_near0 = stn_predictor[sample_nearIndex, :]
        xdata_g0 = tar_predictor[r, c, :]

        # interpolation for every time step
        for d in range(ntime):

            ydata_near = np.squeeze(stn_data[sample_nearIndex, d])
            if len(np.unique(ydata_near)) == 1:  # e.g., for prcp, all zero
                ydata_tar = ydata_near[0]
            else:

                # add dynamic predictors if flag is true and predictors are good
                xdata_near = xdata_near0
                xdata_g = xdata_g0
                if dynamic_predictors['flag'] == True:
                    xdata_near_add = dynamic_predictors['stn_predictor_dynamic'][:, d, sample_nearIndex].T
                    xdata_g_add = dynamic_predictors['tar_predictor_dynamic'][:, d, r, c]
                    if np.all(~np.isnan(xdata_near_add)) and np.all(~np.isnan(xdata_g_add)):
                        xdata_near_try = np.hstack((xdata_near, xdata_near_add))
                        xdata_g_try = np.hstack((xdata_g, xdata_g_add))
                        # check if dynamic predictors are good for regression
                        if check_predictor_matrix_behavior(xdata_near_try, sample_weight) == True:
                            xdata_near = xdata_near_try
                            xdata_g = xdata_g_try
                        else:
                            xdata_near_try = np.hstack((xdata_near, xdata_near_add[:, ~dynamic_predictors['predictor_checkflag']]))
                            xdata_g_try = np.hstack((xdata_g, xdata_g_add[~dynamic_predictors['predictor_checkflag']]))
                            if check_predictor_matrix_behavior(xdata_near_try, sample_weight) == True:
                                xdata_near = xdata_near_try
                                xdata_g = xdata_g_try

                # regression
                if method == 'linear':
                    ydata_tar_all[d] = weight_linear_regression(xdata_near, sample_weight, ydata_near, xdata_g)
                elif method == 'logistic':
                    ydata_tar_all[d] = weight_logistic_regression(xdata_near, sample_weight, ydata_near, xdata_g)
                else:
                    sys.exit(f'Unknonwn regression method: {method}')
    return ydata_tar_all


nstn, ntime = np.shape(stn_data)
nrow, ncol, nearmax = np.shape(tar_nearIndex)
estimates = np.nan * np.zeros([nrow, ncol, ntime], dtype=np.float32)

outputs = []
for r, c in itertools.product(range(nrow), range(ncol)):
    ydata_tar_all = reg_fun(stn_data, stn_predictor, tar_nearIndex, tar_nearWeight, tar_predictor, method, dynamic_predictors, r, c)
    outputs.append(ydata_tar_all)
    
outputs = dask.compute(outputs, n_workers=4)
client.close()

  0%|          | 0/16128 [00:00<?, ?it/s]

CPU times: user 1min 6s, sys: 20.4 s, total: 1min 26s
Wall time: 52.3 s


In [74]:
from dask.distributed import Client

# without specifying unique thread, the function is executed
# on all threads
client = Client(n_workers=4, threads_per_worker=1)

# loop_regression_2Dor3D function

Perhaps you already have a cluster running?
Hosting the HTTP server on port 51901 instead


In [8]:

%%time


@dask.delayed
def weight_linear_regression(nearinfo, weightnear, datanear, tarinfo):
    # # nearinfo: predictors from neighboring stations
    # # [station number, predictor number + 1] array with the first column being ones
    # nearinfo = np.zeros([nnum, npred+1])
    #
    # # weightnear: weight of neighboring stations
    # # [station number, station number] array with weights located in the diagonal
    # weightnear = np.zeros([nnum, nnum])
    # for i in range(nnum):
    #     weightnear[i, i] = 123
    #
    # # tarinfo:  predictors from target stations
    # # [predictor number + 1] vector with the first value being one
    # tarinfo = np.zeros(npred+1)
    #
    # # datanear: data from neighboring stations. [station number] vector
    # datanear = np.zeros(nnum)
    # start regression
    w_pcp_red = np.diag(np.squeeze(weightnear))
    tx_red = np.transpose(nearinfo)
    twx_red = np.matmul(tx_red, w_pcp_red)
    b = least_squares_ludcmp(nearinfo, datanear, twx_red)
    datatar = np.dot(tarinfo, b)
    return datatar


nstn, ntime = np.shape(stn_data)
nrow, ncol, nearmax = np.shape(tar_nearIndex)
estimates = []

for r, c in itertools.product(range(nrow), range(ncol)):

    # prepare xdata and sample weight for training and weights of neighboring stations
    sample_nearIndex = tar_nearIndex[r, c, :]
    index_valid = sample_nearIndex >= 0

    if np.sum(index_valid) > 0:
        sample_nearIndex = sample_nearIndex[index_valid]

        sample_weight = tar_nearWeight[r, c, :][index_valid]
        sample_weight = sample_weight / np.sum(sample_weight)

        xdata_near0 = stn_predictor[sample_nearIndex, :]
        xdata_g0 = tar_predictor[r, c, :]

        # interpolation for every time step
        for d in range(ntime):

            ydata_near = np.squeeze(stn_data[sample_nearIndex, d])
            if len(np.unique(ydata_near)) == 1:  # e.g., for prcp, all zero
                ydata_tar = ydata_near[0]
            else:

                # add dynamic predictors if flag is true and predictors are good
                xdata_near = xdata_near0
                xdata_g = xdata_g0
                if dynamic_predictors['flag'] == True:
                    xdata_near_add = dynamic_predictors['stn_predictor_dynamic'][:, d, sample_nearIndex].T
                    xdata_g_add = dynamic_predictors['tar_predictor_dynamic'][:, d, r, c]
                    if np.all(~np.isnan(xdata_near_add)) and np.all(~np.isnan(xdata_g_add)):
                        xdata_near_try = np.hstack((xdata_near, xdata_near_add))
                        xdata_g_try = np.hstack((xdata_g, xdata_g_add))
                        # check if dynamic predictors are good for regression
                        if check_predictor_matrix_behavior(xdata_near_try, sample_weight) == True:
                            xdata_near = xdata_near_try
                            xdata_g = xdata_g_try
                        else:
                            xdata_near_try = np.hstack((xdata_near, xdata_near_add[:, ~dynamic_predictors['predictor_checkflag']]))
                            xdata_g_try = np.hstack((xdata_g, xdata_g_add[~dynamic_predictors['predictor_checkflag']]))
                            if check_predictor_matrix_behavior(xdata_near_try, sample_weight) == True:
                                xdata_near = xdata_near_try
                                xdata_g = xdata_g_try

                # regression
                if method == 'linear':
                    ydata_tar = weight_linear_regression(xdata_near, sample_weight, ydata_near, xdata_g)
                elif method == 'logistic':
                    ydata_tar = weight_logistic_regression(xdata_near, sample_weight, ydata_near, xdata_g)
                else:
                    sys.exit(f'Unknonwn regression method: {method}')
            estimates.append(ydata_tar)
            
outputs = dask.compute(estimates, n_workers=6)

# client.close()

  0%|          | 0/16128 [00:00<?, ?it/s]

CPU times: user 1min 19s, sys: 7.19 s, total: 1min 26s
Wall time: 59.4 s


exception calling callback for <Future at 0x7fe4da4f8eb0 state=finished returned list>
Traceback (most recent call last):
  File "/opt/anaconda3/envs/general/lib/python3.9/concurrent/futures/_base.py", line 330, in _invoke_callbacks
    callback(self)
  File "/opt/anaconda3/envs/general/lib/python3.9/asyncio/futures.py", line 398, in _call_set_state
    dest_loop.call_soon_threadsafe(_set_state, destination, source)
  File "/opt/anaconda3/envs/general/lib/python3.9/asyncio/base_events.py", line 796, in call_soon_threadsafe
    self._check_closed()
  File "/opt/anaconda3/envs/general/lib/python3.9/asyncio/base_events.py", line 515, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed


In [1]:
import multiprocessing

def slow_function(arg):
    # some slow computation
    return arg * 2

if __name__ == '__main__':
    pool = multiprocessing.Pool()
    inputs = range(10)  # list of inputs to process in parallel
    results = pool.map(slow_function, inputs)  # parallelize the loop
    print(results)


Process SpawnPoolWorker-1:
Traceback (most recent call last):
  File "/opt/anaconda3/envs/general/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/opt/anaconda3/envs/general/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/anaconda3/envs/general/lib/python3.9/multiprocessing/pool.py", line 114, in worker
    task = get()
  File "/opt/anaconda3/envs/general/lib/python3.9/multiprocessing/queues.py", line 367, in get
    return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'slow_function' on <module '__main__' (built-in)>
Process SpawnPoolWorker-2:
Traceback (most recent call last):
  File "/opt/anaconda3/envs/general/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/opt/anaconda3/envs/general/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/anaconda3/e

KeyboardInterrupt: 

Process SpawnPoolWorker-25:
Process SpawnPoolWorker-26:
Process SpawnPoolWorker-30:
Process SpawnPoolWorker-24:
Process SpawnPoolWorker-27:
Process SpawnPoolWorker-21:
Process SpawnPoolWorker-23:
Process SpawnPoolWorker-29:
Traceback (most recent call last):
  File "/opt/anaconda3/envs/general/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
Traceback (most recent call last):
  File "/opt/anaconda3/envs/general/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/anaconda3/envs/general/lib/python3.9/multiprocessing/pool.py", line 114, in worker
    task = get()
  File "/opt/anaconda3/envs/general/lib/python3.9/multiprocessing/queues.py", line 364, in get
    with self._rlock:
  File "/opt/anaconda3/envs/general/lib/python3.9/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
  File "/opt/anaconda3/envs/general/lib/python3.9/multiprocessing/process.

In [2]:
# SuperFastPython.com
# example of a sequential for loop
from time import sleep
from random import random
 
# a task to execute in another process
def task(arg):
    # generate a value between 0 and 1
    value = random()
    # block for a fraction of a second to simulate work
    sleep(value)
    # return the generated value
    return value
 
# entry point for the program
if __name__ == '__main__':
    # call the same function with different data sequentially
    for result in map(task, range(10)):
        # report the value to show progress
        print(result)

0.7102002709574844
0.7753833457865317
0.869570256803165
0.5802933092048493
0.8325173082271703
0.6394695673917928
0.364350936563697
0.744907480397409
0.36804718491988764
0.0033366109138929234


In [None]:
# SuperFastPython.com
# example of a parallel for loop with no return values
from time import sleep
from random import random
from multiprocessing import Pool
 
# task to execute in another process
def task(arg):
    # generate a value between 0 and 1
    value = random()
    # block for a fraction of a second to simulate work
    sleep(value)
    # # report the value to show progress
    print(f'{arg} got {value}', flush=True)
 
# entry point for the program
if __name__ == '__main__':
    # create the process pool
    with Pool() as pool:
        # call the same function with different data in parallel
        pool.map(task, range(10))

Process SpawnPoolWorker-43:
Traceback (most recent call last):
  File "/opt/anaconda3/envs/general/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/opt/anaconda3/envs/general/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/anaconda3/envs/general/lib/python3.9/multiprocessing/pool.py", line 114, in worker
    task = get()
  File "/opt/anaconda3/envs/general/lib/python3.9/multiprocessing/queues.py", line 367, in get
    return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'task' on <module '__main__' (built-in)>
Process SpawnPoolWorker-41:
Traceback (most recent call last):
  File "/opt/anaconda3/envs/general/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/opt/anaconda3/envs/general/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/anaconda3/envs/gen