In [1]:
import os
import scipy.io as sio #for the i/o
import numpy as np
import time #to wait between checking if jobs are done
import ipyparallel as ipp

rc = ipp.Client()
view = rc[:]
print(view)
procs = np.array(rc.ids) #get list of processors on cluster that local environment can access
print(procs)

<DirectView [0, 1, 2, 3,...]>
[0 1 2 3 4 5 6 7 8 9]


In [2]:
## Create Data Directory if not exist
datapath = os.path.join(os.getcwd(),"Data")
os.makedirs(datapath,exist_ok=True)

In [3]:
%%px
import os
import numpy as np
import scipy.io as sio
import tensorflow as tf
# import matplotlib.pyplot as plt
from tensorflow.random import set_seed
from sklearn.model_selection import KFold

In [4]:
%%px
hid_ls = np.arange(1,11)
n_fold = 5

#######################################
#define the activation function
def rbf(x):
    return tf.math.exp(-x**2)

#######################################
#define the derivative of the activation function
def d_rbf(x):
    return tf.gradients(rbf,x)

#######################################
#we couldn't use “tf_d_leaky_relu_6” as an activation function if we wanted to 
#because tensorflow doesn't know how to calculate the gradients of that function.
def rbf_grad(op, grad):
    x = op.inputs[0]
    n_gr = d_rbf(x)    #defining the gradient.
    return grad * n_gr

def py_func(func, inp, Tout, stateful=True, name=None, grad=None):
    # Need to generate a unique name to avoid duplicates:
    rnd_name = 'PyFuncGrad' + str(np.random.randint(0, 1E+2))
    tf.RegisterGradient(rnd_name)(grad)
    g = tf.get_default_graph()
    with g.gradient_override_map({"PyFunc": rnd_name, "PyFuncStateless": rnd_name}):
#     with g.gradient_override_map({"PyFunc": rnd_name}):
        return tf.py_func(func, inp, Tout, stateful=stateful, name=name)
    
def tf_rbf(x,name=None):
    with tf.name_scope(name, "rbf", [x]) as name:
        y = py_func(rbf,   #forward pass function
                    [x],
                    [tf.float32],
                    name=name,
                    grad= rbf_grad) #the function that overrides gradient
        y[0].set_shape(x.get_shape())     #when using with the code, it is used to specify the rank of the input.
    return y[0]


def func(proc_num, proc_max):
    
    def func(X,y,n_fold,n_hid,n_epo,seed=12345):
        '''
        return the average of weighted error for `n_fold` splits
        '''
        train_score = []
        test_score = []
        set_seed(seed)
        kf = KFold(n_splits=n_fold)
        for train_index, test_index in kf.split(X):
            X_train, X_test = X[train_index], X[test_index]
            y_train, y_test = y[train_index], y[test_index]

            model = tf.keras.Sequential()
            model.add(tf.keras.layers.Dense(n_hid,activation=rbf,input_dim=1))  #hid2
            model.add(tf.keras.layers.Dense(1))
            model.compile(loss='mean_squared_error', optimizer=tf.keras.optimizers.Adam(0.01))
            model.fit(X_train,y_train, epochs=n_epo, verbose=0)

            error_train = tf.keras.losses.MSE(model.predict(X_train).flatten(),y_train).numpy()
            error_test = tf.keras.losses.MSE(model.predict(X_test).flatten(),y_test).numpy()
    #         error_ave = (error_train + error_test*(n_fold-1))/n_fold   #weighted average
            train_score.append(error_train)
            test_score.append(error_test)
        return np.mean(train_score), np.mean(test_score)
    
    i = proc_num  #range from 0 to len(hid_ls)
    score = [func(X,y,n_fold,hid_ls[i],j) for j in epo_ls]  
    train = [score[i][0] for i in range(len(score))]
    test = [score[i][1] for i in range(len(score))]
    filename = '/home/jovyan/work/Thesis_Wenjuan_Code/Data/%s_1%s1_split_MSE.mat'%(st_name,hid_ls[i]) #1_x2_1%s1
#     datapath = os.path.join(os.getcwd(),"Data")
#     os.makedirs(datapath,exist_ok=True)
#     filename = 'Data/%s_1%s1_split_MSE0.mat'%(st_name,hid_ls[i]) #1_x2_1%s1
    data_dict = {'train':train, 'test':test}
    sio.savemat(filename, data_dict)
    return

## 1

$$
y = \sin x
$$

Sample size = 1000

In [8]:
%%px
epo_ls = 10*np.arange(1,11)
np.random.seed(12345)
X = np.pi*np.random.uniform(-1,1,size=1000)
y = np.sin(X)
st_name = "sin"

In [9]:
view.push(dict(procs=procs)) #This pushes procs to all processors on the cluster

async_process = view.map_async(lambda proc_num: func(proc_num, proc_max=procs.size), range(procs.size) ) 

time.sleep(1) #give the process time to start and see if any errors occur
if async_process.error[0] is None:
    done = False    
    while done == False:
        if async_process.done():
            done = True
            #print('Now we can load in the data')
        else:
            time.sleep(1)
else:
    raise RuntimeError(async_process.error[0])

## 2
$$
y = \sin x
$$
Sample size = 5000

In [10]:
%%px
epo_ls = 10*np.arange(1,11)
np.random.seed(12345)
X = np.pi*np.random.uniform(-1,1,size=5000)
y = np.sin(X)
st_name = "sin5"

In [11]:
view.push(dict(procs=procs)) #This pushes procs to all processors on the cluster

async_process = view.map_async(lambda proc_num: func(proc_num, proc_max=procs.size), range(procs.size) ) 

time.sleep(1) #give the process time to start and see if any errors occur
if async_process.error[0] is None:
    done = False    
    while done == False:
        if async_process.done():
            done = True
            #print('Now we can load in the data')
        else:
            time.sleep(1)
else:
    raise RuntimeError(async_process.error[0])

## 3

$$
y=(x+3)(x-1)^2
$$
Sample size = 1000

In [5]:
%%px
epo_ls = 100*np.arange(1,11)
np.random.seed(12345)
X = np.random.uniform(-3,3,size=1000)
y = (X+3)*(X-1)**2
st_name = "poly"

In [6]:
view.push(dict(procs=procs)) #This pushes procs to all processors on the cluster

async_process = view.map_async(lambda proc_num: func(proc_num, proc_max=procs.size), range(procs.size) ) 

time.sleep(1) #give the process time to start and see if any errors occur
if async_process.error[0] is None:
    done = False    
    while done == False:
        if async_process.done():
            done = True
            #print('Now we can load in the data')
        else:
            time.sleep(1)
else:
    raise RuntimeError(async_process.error[0])

## 4

$$
y=(x+3)(x-1)^2
$$
Sample size = 5000

In [7]:
%%px
epo_ls = 100*np.arange(1,11)
np.random.seed(12345)
X = np.random.uniform(-3,3,size=5000)
y = (X+3)*(X-1)**2
st_name = "poly5"

In [8]:
view.push(dict(procs=procs)) #This pushes procs to all processors on the cluster

async_process = view.map_async(lambda proc_num: func(proc_num, proc_max=procs.size), range(procs.size) ) 

time.sleep(1) #give the process time to start and see if any errors occur
if async_process.error[0] is None:
    done = False    
    while done == False:
        if async_process.done():
            done = True
            #print('Now we can load in the data')
        else:
            time.sleep(1)
else:
    raise RuntimeError(async_process.error[0])