In [None]:
import dask.array as da
from sklearn import datasets
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import time
import matplotlib.pyplot as plt
from dask.distributed import Client, LocalCluster

In [None]:
# here we load the data using numpy and do some pre-processing
X_data = np.load('/home/pzaspel/data/md17_X.npy')
y_data = np.load('/home/pzaspel/data/md17_Y.npy')

scaler = StandardScaler()
X = scaler.fit_transform(X_data)

y = y_data - np.average(y_data, 0)

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

In [None]:
# here we initialize the cluster i.e set of machines
# n_worker refers to no of workes or cores that we need
cluster = LocalCluster(host="titlis.clamv.jacobs-university.de",scheduler_port=0, dashboard_address="titlis.clamv.jacobs-university.de:0", n_workers=1,  threads_per_worker=1, memory_limit='800GB')
client = Client(cluster)
client

In [None]:
# least angle regression starts from here
def lars(X_train, X_test, y_train, y_test, no_of_iterations):
    start = time.time()
    
    # we initialize the varibales we need
    m, n = X_train.shape
    beta = np.zeros(shape=(n, 1))
    residual = client.persist(y_train)
    y_hat = da.zeros(shape=(m, 1))
    
    # active variable is empty at first
    active_variable = []
    
    # inactive variable is full at the beginning
    inactive_variable = list(range(n))
    
    # sign keeps tracks of the sign of active variables
    sign = np.zeros(shape=(n, ), dtype=np.int8)
    
    # to record the validation error
    mse_test = []
    
    X_train_transpose = client.persist(X_train.transpose())
    
    iterations = 1
    
    for j in range(no_of_iterations):
        
        # we calculate the correlation vector
        c = client.persist(da.dot(X_train_transpose, residual))
        
        # extract the index of maximum element in c that is not in the active set
        i = inactive_variable[(da.argmax(da.fabs(c[inactive_variable, :]))).compute()]
        
        # add i to active variable list
        active_variable.append(i)
        
        # it just prints active variable at 100, 200, 300th... iterations
        if (iterations % 100 == 0):
            print(len(active_variable))
            
        # we remove i from inactive variable list
        inactive_variable.remove(i)
        
        # we calculate the sign of highly correlated variable
        if c[i] < 0:
            sign[i] = -1
        else:
            sign[i] = 1
        
        # extract the active variables data from X
        X_A = client.persist(X_train[:, active_variable] * da.asarray(sign[active_variable]))
        
        
        # from here everything as described in original LARS paper
        # we do caluclations to calculate the bisector
        G_A = client.persist(da.dot(X_A.transpose(), X_A))
    
        G_A_INV = client.persist(da.linalg.inv(G_A))
        
        # this is Inverse G dot by 1 where length of 1 is len of active variables
        G_A_1S = client.persist(da.sum(G_A_INV, 1))
    
        A_A = client.persist(1 / client.persist(da.sqrt(da.sum(G_A_1S))))
    
        w_a = client.persist(A_A * G_A_1S)
    
        w_a = client.persist(w_a.reshape(-1, 1))
        
        # this is equal bisector
        y_a = client.persist(da.dot(X_A, w_a))
        
        # from here every calculations are done to calculate paramerter solution i.e gamma
        C = da.fabs(c[i]).compute()
    
        a = client.persist(da.dot(X_train_transpose, y_a))

        gamma = None

        if len(active_variable) < n:
            
            # this is the first component in equation 5 see thesis
            first_comp = client.persist((C - c[inactive_variable, :]) / (A_A - a[inactive_variable, :]))
            
            # this is the second component in equation 5 see thesis
            second_comp = client.persist((C + c[inactive_variable, :]) / (A_A + a[inactive_variable, :]))

            first_comp.compute_chunk_sizes()
            second_comp.compute_chunk_sizes()
            
            
            # only positive over the elements is taken
            positive_elements = client.persist(da.extract(first_comp > 0, first_comp))
            snd_positive_elements = client.persist(da.extract(second_comp > 0, second_comp))
            
            # now we calculate the minimum over the positive elements
            gamma = min((da.nanmin(positive_elements).compute(), da.nanmin(snd_positive_elements).compute()))
        
        # when every variable is in the active set the above calculations does not hold
        else:
            gamma = (C / A_A).compute()
        
        # this is the new train prediction
        y_hat = client.persist(y_hat + gamma * y_a)
        
        # current residual
        residual = client.persist(y_train - y_hat)
        
        # we calculate the coefficients
        beta[active_variable, :] = beta[active_variable , :] + gamma * np.asarray(w_a) * sign[active_variable].reshape(-1, 1)
        
        # convert it to dask since above calculation is done in NumPy
        beta_in_dask = client.persist(da.asarray(beta))
        
        # this is the validation set prediction
        y_pred = client.persist(da.dot(X_test, beta_in_dask))
        
        # we calculate the validation error 
        test_residual = client.persist(y_test - y_pred)
        mse_test.append((da.square(test_residual).mean()).compute())
        iterations += 1
        
    end = time.time()
    execution_time = end - start
    return execution_time, mse_test, iterations

In [None]:
# if you want to make variable plot, set this to true
make_varibale_plots = False

# intialize list to plot execution time vs no of cores plot
no_of_cores = []
total_execution_times = []

# set no of iterations as you want
no_of_iterations = 500

# to scale cluster, we go from 1 to 40
for k in range(1, 41):
    client.restart()
    client.cluster.scale(k)
    
    # we load the data into the cluster
    future1 = client.scatter(X_train)
    future2 = client.scatter(X_test)
    future3 = client.scatter(y_train)
    future4 = client.scatter(y_test)
    
    # for just one worker, we do not chunk the data
    # we retrive the data from cluster as the dask array
    if (k == 1):
        X_t = da.from_delayed(future1, shape=X_train.shape, dtype=X_train.dtype)
        X_t = X_t.rechunk((695265, 3121))
        X_t = X_t.persist()
        client.rebalance(X_t)

        X_te = da.from_delayed(future2, shape=X_test.shape, dtype=X_test.dtype)
        X_te = X_te.rechunk((297972, 3121))
        X_te = X_te.persist()
        client.rebalance(X_te)

    
        y_t = da.from_delayed(future3, shape=y_train.shape, dtype=y_train.dtype)
        y_t = y_t.rechunk((695265, 1))
        y_t = y_t.persist()
        client.rebalance(y_t)
    
        y_te = da.from_delayed(future4, shape=y_test.shape, dtype=y_test.dtype)
        y_te = y_te.rechunk((297972, 1))
        y_te = y_te.persist()
        client.rebalance(y_te)
    
    # if there are more workers, chunk the data
    # we retrive the data from cluster as the dask array
    # then, chunk the data
    # spread the chunks over the cluster
    else:
        

        X_t = da.from_delayed(future1, shape=X_train.shape, dtype=X_train.dtype)
        X_t = X_t.rechunk((200000, 3121))
        X_t = X_t.persist()
        client.rebalance(X_t)

        X_te = da.from_delayed(future2, shape=X_test.shape, dtype=X_test.dtype)
        X_te = X_te.rechunk((100000, 3121))
        X_te = X_te.persist()
        client.rebalance(X_te)


        y_t = da.from_delayed(future3, shape=y_train.shape, dtype=y_train.dtype)
        y_t = y_t.rechunk((695265, 1))
        y_t = y_t.persist()
        client.rebalance(y_t)


        y_te = da.from_delayed(future4, shape=y_test.shape, dtype=y_test.dtype)
        y_te = y_te.rechunk((297972, 1))
        y_te = y_te.persist()
        client.rebalance(y_te)
    
    execution_time, mse_test, iterations = lars(X_t, X_te, y_t, y_te, no_of_iterations)
    
    
    if (make_varibale_plots == True):
        plt.plot(list(range(1, iterations)), mse_test, '-b', label='Validation Error')
        plt.legend(loc='upper right')
        plt.title('Validation Error vs No of Variables')
        plt.xlabel('No of Variables')
        plt.ylabel('Mean Squared Error')
        plt.show()
    
    total_execution_times.append(execution_time)
    no_of_cores.append(k)


In [None]:
# here we plot no of cores vs speedup
total_execution_times = np.array(total_execution_times)
speed_up = total_execution_times[0] / total_execution_times
plt.plot(no_of_cores, speed_up, color='orange', label='speedup')
plt.legend(loc='upper right')
plt.title('Speedup vs No of Cores')
plt.xlabel('No of Cores')
plt.ylabel('Speedup')
plt.show()