# Distributed Lasso Regression with Apache Pig

*By Alexandre COMBESSIE and Thibaut DUGUET*

## Introduction

### a. Project context and objectives

This project has been developed as part of the course "Practical tools for the analysis of Big Data" at ENSAE ParisTech, taught by Xavier Dupré and Mathieu Durut. It is one of the components of the Specialized Master in Data Science at ENSAE. The course website and material is available at <http://www.xavierdupre.fr/app/ensae_teaching_cs/helpsphinx3/td_3a.html>.

We set ourselves 3 key objectives for this project:
- Understanding the principles of distributed computing on Apache Hadoop
- Applying this knowledge to the case of Lasso Regression, by implementing a distributed algorithm in the programming language Pig Latin
- Testing our algorithm on a real "Big Data" set to verify its functioning and derive useful insights

### b. Algorithm implemented

Our overall statistical framework is similar to the Ordinary Least Square (OLS) linear regression. We assume that we have a set of independent, identically distributed samples $\{(x_1,y_1)...(x_n,y_n)\}$ where $x_i \in \mathbb{R}^d$ is the feature vector and $y_i \in \mathbb{R}$ the  response. We are looking to "learn" weights $w_1...w_d$ to fit:

\begin{equation}
y=Xw+\epsilon \quad \textrm{where} \quad y=\begin{pmatrix} y_1\\ ...\\ y_n \end{pmatrix} \in \mathbb{R}^n, \quad X= \begin{bmatrix} x_1^T \\ ... \\ x_n^T \end{bmatrix} \in \mathbb{R}^{n \times d}, \quad w=\begin{pmatrix} w_1\\ ...\\ w_d \end{pmatrix} \in \mathbb{R}^d, \quad \epsilon=\begin{pmatrix} \epsilon_1\\ ...\\ \epsilon_n \end{pmatrix} \in \mathbb{R}^n \quad \textrm{error term}
\end{equation}

In the case of the OLS linear regression, the solution of this equation is simply given by:

\begin{equation}
 \min_{w \in \mathbb{R}^d} \mathcal{L}_{OLS}(w) \quad \textrm{with} \quad \mathcal{L}_{OLS}(w)=\tfrac{1}{2} \left \| Xw-y \right \|^2_2 \quad \textrm{loss function}
\end{equation}

For the Lasso regression, the framework stays the same, but we use a slightly modified loss function in the minimization program, by adding a $L_1$-norm regularization term:

\begin{equation}
\mathcal{L}_{lasso}(w)=\tfrac{1}{2} \left \| Xw-y \right \|^2_2 + \lambda \left \| w \right \|_1 \quad \textrm{with $\lambda$ an external parameter}
\end{equation}

The goal of this regularization term is to encourage the sparsity of the weight vector $w$.

Our algorithm for solzing  the minimization program of $\mathcal{L}_{lasso}(w)$ is based on distributed coordinate descent. We start by splitting our data by feature:
 <img src="http://i.imgur.com/NtS41eM.png">
 
Then, for each feature, we perform a coordinate descent using the proximal gradient of $\mathcal{L}_{lasso}(w)$ with respect to $w_i$. That is where our splitting strategy becomes useful, as is simplifies the gradient computation. Note that we use the concept of *proximal gradient* to overcome the problem of non-derivability of the $L_1$-norm. We will thus use the proximity operator defined as:

\begin{equation}
\left(\operatorname{prox}_{\lambda}(x)\right)_i = x_i-\lambda \quad \textrm{if} \quad x_i>\lambda, \quad 0\quad \textrm{if} \quad|x_i|\leq\lambda, \quad x_i+\lambda\quad \textrm{if} \quad x_i<-\lambda
\end{equation}

We will use this proximity operator on $\nabla_j \big(\mathcal{L}_{OLS}(w) \big)$, the gradient of $\tfrac{1}{2}\left \| Xw-y \right \|^2_2$ with respect to $w_j$:

\begin{equation}
\nabla_j (\mathcal{L}_{OLS}(w))=\frac{\partial }{\partial w_j} ( \tfrac{1}{2}\left \| Xw-y \right \|^2_2 )=\sum ^n_{i=1}{x_{ij}( \sum^d_{k=1}{x_{ik}w_k-y_i})}
\end{equation}

To summarize, our distributed algorithm can be described as:

                Initialize w_1 ... w_d = 0
                for t = 1 ... T iteration:
                    for j = 1 ... d feature:
                        g_j = GRADIENT_j(L_OLS(w))
                        w_j^{t+1} = PROXIMITYOPERATOR(w_j^{t} - gamma * g_j)

Our input is the observed matrix $X$ and the response vector $y$. Our output is the weight vector $w$. We have 2 parameters: $\lambda$ for controling the thresholding effect of the proximity operator, and $\gamma$ for controlling the gradient step. 

Thanks to mathematical properties of the proximal gradient, this algorithm should converge to the minimum of the loss function and solve the equation.

Our algorithm is mostly based on two academic papers:
- Joseph K. Bradley, Aapo Kyrola, Danny Bickson, and Carlos Guestrin (2011). 
"Parallel Coordinate Descent for L1-Regularized Loss Minimization." 
International Conference on Machine Learning (ICML 2011).
http://arxiv.org/abs/1105.5379
- Zeng, Jichuan, Haiqin Yang, Irwin King, and Michael R. Lyu (2014). "A Comparison of Lasso-type Algorithms on Distributed Parallel Machine Learning Platforms." Neural Information Processing Systems Workshop (NIPS 2014). http://web.stanford.edu/~rezab/nips2014workshop/submits/plasso.pdf


Other sources have been used and are listed here: https://github.com/alteralec/ENSAE_Big-Data_Project-Lasso/tree/master/Doc

## 1. Workspace & Data Loading

In [36]:
import pyensae
import numpy as np
import pandas as pd
import os
import time
import re
import plotly
import plotly.plotly as py
import plotly.graph_objs as go
import warnings
warnings.filterwarnings('ignore')

### a. Connection to the Cloudera cluster

In [3]:
from pyquickhelper.ipythonhelper import open_html_form
params={"server":"","username":"","password":""}
open_html_form(params=params,title="server + credentials", key_save="params")

In [12]:
password = params["password"]
server = params["server"]
username = params["username"]

In [82]:
ssh = %remote_open
ssh

<pyensae.remote.ssh_remote_connection.ASSHClient at 0xb1f7b38>

Now that the SSH connection to the cluster is open, we can upload our datasets:
1. A small dataset of simulated random variables with known relations
2. A larger dataset of "real" variables, a sample of the popular Airline Dataset (http://stat-computing.org/dataexpo/2009/the-data.html)

### b. Data loading

In [None]:
%remote_up SimulatedData.csv simulatedData.csv
%remote_up weights.csv weights.csv
%remote_up airline_sample5.csv airline_sample5.csv

In [123]:
%remote_cmd hdfs dfs -put simulatedData.csv ./simulatedData.csv
%remote_cmd hdfs dfs -put weights.csv ./weights.csv
%remote_cmd hdfs dfs -put airline_sample5.csv  ./airline_sample5.csv 

We can verify that our data has been correctly stored on the cluster.

In [14]:
%remote_cmd hdfs dfs -ls *.csv

## 2. Algorithm implementation

*This algorithm was developed based on the simulated dataset*

### a. Data preparation

#### Counting observations

In [16]:
%%PIG count_observations.pig
A = LOAD '$input'
        USING PigStorage(';')
        AS (y:double, f1:double, f2:double, f3:double, f4:double, f5:double, f6:double, f7:double, f8:double, 
            f9:double, f10:double);
B = GROUP A ALL;
C = FOREACH B GENERATE COUNT(A);
STORE C
INTO '$output'
USING PigStorage(';') ;

In [61]:
ssh.pig_submit("count_observations.pig", redirection="redirection",
               params = dict(input='simulatedData.csv', output='count_simulatedData'))

('', '')

In [62]:
%remote_cmd tail redirection.err

In order to verify that our script worked, we download the result from the cluster and store it as a local variable.

In [40]:
%remote_cmd hdfs dfs -getmerge count_simulatedData count_simulatedData
if os.path.exists("count_simulatedData"): os.remove("count_simulatedData")
%remote_down count_simulatedData count_simulatedData
nb_obs=int(np.loadtxt("count_simulatedData",delimiter=";"))
print(str(nb_obs)+". It works!")

1000. It works!


#### Data standardization

Now, we need a script to standardize our datasets in terms of mean and standard error, while storing these values for each columns.

In [79]:
%%PIG standardize.pig
A = LOAD '$input'
        USING PigStorage(';')
        AS (y:double, f1:double, f2:double, f3:double, f4:double, f5:double, f6:double, f7:double, f8:double, 
            f9:double, f10:double);
B = GROUP A ALL;
C = FOREACH B
    GENERATE (double) AVG(A.y) AS ym, (double) AVG(A.f1) AS f1m, (double) AVG(A.f2) AS f2m, (double) AVG(A.f3) AS f3m, 
    (double) AVG(A.f4) AS f4m, (double) AVG(A.f5) AS f5m, (double) AVG(A.f6) AS f6m, (double) AVG(A.f7) AS f7m, 
    (double) AVG(A.f8) AS f8m, (double) AVG(A.f9) AS f9m, (double) AVG(A.f10) AS f10m;
D = FOREACH A
    GENERATE y - (double)C.ym AS y, f1 - (double)C.f1m AS f1, f2 - (double)C.f2m AS f2, f3 - (double)C.f3m AS f3, 
    f4 - (double)C.f4m AS f4, f5 - (double)C.f5m AS f5, f6 - (double)C.f6m AS f6, f7 - (double)C.f7m AS f7, 
    f8 - (double)C.f8m AS f8, f9 - (double)C.f9m AS f9, f10 - (double)C.f10m AS f10;
D2 = FOREACH D
     GENERATE (double) y*y AS y, (double) f1*f1 AS f1, (double) f2*f2 AS f2, (double) f3*f3 AS f3, (double) f4*f4 AS f4, 
    (double) f5*f5 AS f5, (double) f6*f6 AS f6, (double) f7*f7 AS f7,
    (double) f8*f8 AS f8, (double) f9*f9 AS f9, (double) f10*f10 AS f10;
E = GROUP D2 ALL;
F = FOREACH E
    GENERATE (double) AVG(D2.y) AS yv, (double) AVG(D2.f1) AS f1v, (double) AVG(D2.f2) AS f2v, (double) AVG(D2.f3) AS f3v, 
    (double) AVG(D2.f4) AS f4v, (double) AVG(D2.f5) AS f5v, (double) AVG(D2.f6) AS f6v, (double) AVG(D2.f7) AS f7v, 
    (double) AVG(D2.f8) AS f8v, (double) AVG(D2.f9) AS f9v, (double) AVG(D2.f10) AS f10v;
G = FOREACH F
    GENERATE (double) SQRT(F.yv) AS ye, (double) SQRT(F.f1v) AS f1e, (double) SQRT(F.f2v) AS f2e, (double) SQRT(F.f3v) AS f3e, 
    (double) SQRT(F.f4v) AS f4e, (double) SQRT(F.f5v) AS f5e, (double) SQRT(F.f6v) AS f6e, (double) SQRT(F.f7v) AS f7e, 
    (double) SQRT(F.f8v) AS f8e, (double) SQRT(F.f9v) AS f9e, (double) SQRT(F.f10v) AS f10e;
H = FOREACH D
    GENERATE y / (double)G.ye AS y, f1 / (double)G.f1e AS f1, f2 / (double)G.f2e AS f2, f3 / (double)G.f3e AS f3, 
    f4 / (double)G.f4e AS f4, f5 / (double)G.f5e AS f5, f6 / (double)G.f6e AS f6, f7 / (double)G.f7e AS f7, 
    f8 / (double)G.f8e AS f8, f9 / (double)G.f9e AS f9, f10 / (double)G.f10e AS f10;
mean = FOREACH C
        GENERATE (double) ym, (double) f1m, (double) f2m, (double) f3m, (double) f4m, (double) f5m, (double) f6m, 
        (double) f7m, (double) f8m, (double) f9m, (double) f10m;
standarderror = FOREACH G
                 GENERATE (double) ye, (double) f1e, (double) f2e, (double) f3e, (double) f4e, (double) f5e, (double) f6e, 
                (double) f7e, (double) f8e, (double) f9e, (double) f10e ;
STORE H INTO '$output_dataset' USING PigStorage(';') ;
STORE mean INTO '$output_mean' USING PigStorage(';') ;
STORE standarderror INTO '$output_se' USING PigStorage(';') ;

In [136]:
ssh.pig_submit("standardize.pig", redirection="redirection",
               params = dict(input='simulatedData.csv',
                             output_dataset='simulatedData_standard',
                            output_mean='mean_simulatedData',
                            output_se='se_simulatedData'))

('', '')

In [25]:
%remote_cmd hdfs dfs -getmerge mean_simulatedData mean_simulatedData
%remote_cmd hdfs dfs -getmerge se_simulatedData se_simulatedData
if os.path.exists("mean_simulatedData"): os.remove("mean_simulatedData")
if os.path.exists("se_simulatedData"): os.remove("se_simulatedData")
%remote_down mean_simulatedData mean_simulatedData
%remote_down se_simulatedData se_simulatedData
mean_sim=np.loadtxt("mean_simulatedData",delimiter=";")
se_sim=np.loadtxt("se_simulatedData",delimiter=";")
print(np.vstack((mean_sim,se_sim)))

[[ 2.22782665  2.39314643  2.45934351  2.51525212  2.48939266  2.46122243
   2.55422532  2.52448601  2.46756322  2.5558095   2.54373562]
 [ 4.90413158  1.72037209  1.68864271  1.71590714  1.73155267  1.68990918
   1.71688259  1.70245844  1.70297213  1.72821554  1.69839152]]


Our script worked! We can now implement our algorithm on this standardized dataset.

### b. Coordinate Descent iteration

In [218]:
%%PIG iteration.pig

%declare gamma $gam
%declare lambda $lam
%declare nb_observation $count

------------------------------------------------DATA LOADING
INPUTFILE = LOAD '$normalizedinput'
        USING PigStorage(';')
        AS (y:double, f1:double, f2:double, f3:double, f4:double, f5:double, f6:double, f7:double, f8:double, 
            f9:double, f10:double);
A = RANK INPUTFILE;
W = LOAD '$previousWeights'
        USING PigStorage(';')
        AS (w1:double, w2:double, w3:double, w4:double, w5:double, w6:double, w7:double, w8:double, 
            w9:double, w10:double);

        
------------------------------------------------DATA PREPARATION        
B = CROSS A, W;
D = FOREACH B GENERATE $0 as id, 
                      y as y,
                      (f1,f2,f3,f4,f5,f6,f7,f8,f9,f10) as vector,
                      FLATTEN({(f1,w1,1),(f2,w2,2),(f3,w3,3),(f4,w4,4),(f5,w5,5),(f6,w6,6),(f7,w7,7),
                               (f8,w8,8),(f9,w9,9),(f10,w10,10)});
E = FOREACH D GENERATE id as id, 
                      y as y,
                      vector as vector,
                      TOTUPLE($3,$4,$5) as product;
            
------------------------------------------------PROXIMAL GRADIENT COMPUTATION            
F = FOREACH E GENERATE id as id, 
                      y as y,
                      vector as vector,
                      product as product,
                      product.$0 * product.$1 as prod; 
G = GROUP F by (id, y, vector);
H = FOREACH G GENERATE group.id as id, 
                       group.y as y,
                       group.vector as vector,
                       SUM(F.prod) - group.y as diff;                
I = FOREACH H GENERATE id as id,
                       y as y,
                       diff as diff,
                       FLATTEN({(vector.$0,1),(vector.$1,2),(vector.$2,3),(vector.$3,4),(vector.$4,5),
                               (vector.$5,6),(vector.$6,7),(vector.$7,8),(vector.$8,9),(vector.$9,10)});
J = FOREACH I GENERATE id as id,
                       y as y,
                       diff * $3 as gradient,
                       $4 as dim;                        
K = GROUP J by dim;
L = FOREACH K GENERATE group as dim,
                       SUM(J.gradient)/$nb_observation as gradient;
W2 = FOREACH W GENERATE FLATTEN({(w1,1),(w2,2),(w3,3),(w4,4),(w1,5),(w1,6),(w1,7),(w1,8),(w1,9),(w1,10)});
M = JOIN L BY dim, W2 BY $1;
N = FOREACH M GENERATE $0 as dim,
                      (CASE
                        WHEN $2 - $gamma * $1 > $lambda THEN $2 - $gamma * $1 - $lambda
                        WHEN ABS($2 - $gamma * $1) <= $lambda THEN 0
                        WHEN $2 - $gamma * $1 < - $lambda THEN $2 - $gamma * $1 + $lambda
                       END) as w;
    
------------------------------------------------RETURN RESULTS
N2 = GROUP N ALL;
N3 = FOREACH N2 GENERATE FLATTEN(BagToTuple(N.w));
O = JOIN N BY dim, W2 BY $1;
P = FOREACH O GENERATE ABS($2 - $1) AS conv;
Q = GROUP P ALL;
R = FOREACH Q GENERATE SUM(P.conv) AS conv;
STORE N3 INTO '$newWeights' USING PigStorage(';') ;
STORE R INTO '$conv' USING PigStorage(';') ;

For each iteration, we obtain the new weights, along with an indicator of convergence. Once we reach a convergence criterion, we can obtain the "true" weights for non-standardized data with the formula:

\begin{equation*}
Weight_{true}=\frac{Weight_{standard} \times SE(response)}{SE(weight)} \quad \textrm{where} \quad \textrm{SE is the Standard Error}
\end{equation*}

We can submit this script to verify that it works for 1 iteration.

In [177]:
ssh.pig_submit("iteration.pig", redirection="redirection", 
               params = dict(count=str(nb_obs),
                             gam=str(0.1),
                             lam=str(0.01),
                             normalizedinput='simulatedData_standard/part-m-00000',
                            previousWeights='weights.csv', 
                            newWeights='sim_weights1', 
                            conv='sim_conv1'))

('', '')

In [195]:
%remote_cmd hdfs dfs -ls sim_weights1

In [197]:
%remote_cmd hdfs dfs -tail sim_weights1/part-r-00000

### c. Automating the iterations

Now we will implement a Python script to launch a series of iteration on our dataset. For that, we will need a simple function to verify is the iteration is finished, and another function to retrieve the results from it (weights and convergence value).

In [17]:
def iteration_finished(output):
    result = %remote_cmd hdfs dfs -ls $output
    return(re.search("SUCCESS", result.data) != None)

In [18]:
def getvalue_fromHDFS(x):
    %remote_cmd hdfs dfs -getmerge $x $x
    if os.path.exists(x): os.remove(x)
    %remote_down $x $x
    return(np.loadtxt(x,delimiter=";"))

Now we can write a Python script to automate the iterations.

In [None]:
i=1
max_iteration=30
previousWeights_name = 'weights.csv'
convergence=[]
weight_matrix=np.array(range(1,11))
while(i <= max_iteration):
    newWeights_name="sim_weights"+str(i)
    conv_name="sim_conv"+str(i)
    state_job=False
    ssh.pig_submit("iteration.pig", redirection="redirection", 
               params = dict(count=str(nb_obs),
                            gam=str(0.1),
                             lam=str(0.01),
                             normalizedinput='simulatedData_standard/part-m-00000',
                            previousWeights=previousWeights_name, 
                            newWeights=newWeights_name, 
                            conv=conv_name))
    while(state_job==False):
        state_job=iteration_finished(conv_name)                    
        time.sleep(7)
    else:  
        new_weights=getvalue_fromHDFS(newWeights_name)
        new_convergence=getvalue_fromHDFS(conv_name)
        convergence=np.append(convergence,new_convergence)
        weight_matrix=np.vstack((weight_matrix,new_weights))
        print(str(i)+": "+str(new_weights.round(3))+" convergence: "+str(new_convergence.round(6)))
        previousWeights_name = newWeights_name
        i = i+1

We can see here below that our algorithm converges rapidly, which is not surprising since our test dataset is composed of random variables with a known link between the features and the response vector. We notice that some weights are always set to 0, which corresponds to the sparsity property of the the Lasso $L_1$-norm regularization. The $\lambda$ parameter of the proximity operator also accelerates greatly the convergence.

In [73]:
iterate_range=np.arange(0,51)
data=[]
for i in range(0,10):
    data.append(go.Scatter(x=iterate_range,y=np.append(0,weight_matrix[1:15,i]*se_sim[0]/se_sim[i+1]),
                           mode='lines+markers',name="Weight "+str(i+1)))
py.iplot(data, filename='simulated_weightconvergence')

In [54]:
py.iplot([go.Bar(x=iterate_range+1,y=convergence[0:15])], filename='simulated_convergence')

## 3. Test of our algorithm on 'real' data

Now it is time to test our algorithm on the chosen dataset, i.e. a sample of the popular "Airline Dataset" on airline on-time performance. It is available on http://stat-computing.org/dataexpo/2009/the-data.html). It covers flight arrival and departure details for all commercial flights in the USA from 1987 to 2008. 

We prepared this dataset beforehand, according to the recommendations of Michael Kane and Jay Emerson on http://stat-computing.org/dataexpo/2009/posters/kane-emerson.pdf. Following this data preparation step, we went from 120 to 72 million records. Because of computational issues on the cluster, we had to take only a sample of 5% of the prepared database, covering 3.6 million records for a size of 127MB.

In this dataset, we chose to study the following 11 variables.
- **Response**: delay upon arrival
- **Features**:
    - Year of the flight
    - Month of the flight
    - Age of the plane (estimated)
    - Air Time
    - Delay at departure
    - Taxi out time at departure
    - Distance
    - Elapsed flight time
    - Day of Month
    - Day of Week

Thanks to our Lasso algorithm, we can hope to determine what are the key factors explaining the flight delay upon arrival. Let's apply our scripts!



#### Counting observations

In [197]:
ssh.pig_submit("count_observations.pig", redirection="redirection",
               params = dict(input='airline_sample5.csv', output='count_airline'))

('', '')

In [198]:
nobs_airline=getvalue_fromHDFS('count_airline')
int(nobs_airline)

3613346

#### Data standardization

In [199]:
ssh.pig_submit("standardize.pig", redirection="redirection",
               params = dict(input='airline_sample5.csv',
                             output_dataset='airline_standard',
                            output_mean='mean_airline',
                            output_se='se_airline'))

('', '')

In [34]:
means_airline=getvalue_fromHDFS('mean_airline')
standarderrors_airline=getvalue_fromHDFS('se_airline')

In [37]:
pd.DataFrame(np.vstack((means_airline,standarderrors_airline)).round(2),columns=column_names,index=["Mean","Standard error"])

Unnamed: 0,ArrDelay,Year,Month,Age,AirTime,DepDelay,TaxiOut,Distance,ActualElapsedTime,DayofMonth,DayOfWeek
Mean,7.82,2001.96,6.56,30.83,103.6,8.94,15.67,737.04,125.38,15.74,3.95
Standard error,33.71,4.29,3.4,64.1,71.85,31.31,10.96,564.36,70.18,8.79,1.99


#### Lasso algorithm

In [None]:
i=1
max_iteration=20
previousWeights_name = 'weights.csv'
convergence_airline=[]
weight_matrix_airline=np.array(range(1,11))
while(i <= max_iteration):
    newWeights_name="airline_weights"+str(i)
    conv_name="airline_conv"+str(i) 
    state_job=False
    ssh.pig_submit("iteration.pig", redirection="redirection", 
               params = dict(count=str(nobs_airline),
                            gam=str(0.1),
                             lam=str(0.001),
                             normalizedinput='airline_standard',
                            previousWeights=previousWeights_name, 
                            newWeights=newWeights_name, 
                            conv=conv_name))
    while(state_job==False):
        state_job=iteration_finished(conv_name)                    
        time.sleep(7)
    else:  
        new_weights_airline=getvalue_fromHDFS(newWeights_name)
        new_convergence_airline=getvalue_fromHDFS(conv_name)
        convergence_airline=np.append(convergence_airline,new_convergence_airline)
        weight_matrix_airline=np.vstack((weight_matrix_airline,new_weights_airline))
        print(str(i)+": "+str(new_weights_airline.round(3))+" convergence: "+str(new_convergence_airline.round(6)))
        previousWeights_name = newWeights_name
        i = i+1

In [74]:
iterate_range=np.arange(0,51)
column_names=["ArrDelay","Year","Month","Age","AirTime","DepDelay","TaxiOut",
            "Distance","ActualElapsedTime","DayofMonth","DayOfWeek"]
data=[]
for i in range(0,10):
    data.append(go.Scatter(x=iterate_range,
                           y=np.append(0,weight_matrix_airline[1:15,i]*standarderrors_airline[0]/standarderrors_airline[i+1]),
                           mode='lines+markers',name=column_names[i+1]))
py.iplot(data, filename='airline_weightconvergence')

Our algorithm converges rapidly in terms of number of iteration. However, it is quite computationally intensive: one iteration takes between 30 and 40 minutes to give its results. Hence, we had to run our algorithm for several hours.

Because of the sparsity property of the Lasso, many weights as set to 0, which indicates a possible lack of correlation between the delay upon arrival (*our response*) and the feature. This stays true even when we reduce the $\lambda$ proximity parameter, which is 0.001 here instead of 0.01 previously.

It would mean that the 2 key factors explaining the delay upon arrival are:
1. Delay at departure
2. Taxi out time at departure

While the first factor is obvious (direct causation), the second factor is less so. It would seem Taxi out time explains a lot of delay at arrival. It is defined as the "period of time between when an airplane leaves the gate and when it takes off". It means that the primary factors of delay do not come during the flight, but before the plane takes off. It is no so surprising, but still an interesting insight.

#### OLS regression algorithm

As a final test of our algorithm, we would like to see if it could also be used to fit an Ordinary Least Square regression model to our dataset. This model is actually a special case of our algorithm, with the proximity paramater $\lambda=0$. 

For this experiment, we set the step size $\gamma=1$ for the first 20 iterations, then $\gamma=0.5$ in order to ensure the convergence.

In [None]:
i=1
max_iteration=20
previousWeights_name = 'weights.csv'
convergence_airline_ols=[]
weight_matrix_airline_ols=np.array(range(1,11))
while(i <= max_iteration):
    newWeights_name="airline_weights_ols"+str(i)
    conv_name="airline_conv_ols"+str(i) 
    state_job=False
    ssh.pig_submit("iteration.pig", redirection="redirection", 
               params = dict(count=str(nobs_airline),
                            gam=str(1.0),
                             lam=str(0.0),
                             normalizedinput='airline_standard',
                            previousWeights=previousWeights_name, 
                            newWeights=newWeights_name, 
                            conv=conv_name))
    while(state_job==False):
        state_job=iteration_finished(conv_name)                    
        time.sleep(7)
    else:  
        new_weights_airline_ols=getvalue_fromHDFS(newWeights_name)
        new_convergence_airline_ols=getvalue_fromHDFS(conv_name)
        convergence_airline_ols=np.append(convergence_airline_ols,new_convergence_airline_ols)
        weight_matrix_airline_ols=np.vstack((weight_matrix_airline_ols,new_weights_airline_ols))
        print(str(i)+": "+str(new_weights_airline_ols.round(3))+" convergence: "+str(new_convergence_airline_ols.round(6)))
        previousWeights_name = newWeights_name
        i = i+1

In [None]:
i=21
max_iteration=40
while(i <= max_iteration):
    newWeights_name="airline_weights_ols"+str(i)
    conv_name="airline_conv_ols"+str(i) 
    state_job=False
    ssh.pig_submit("iteration.pig", redirection="redirection", 
               params = dict(count=str(nobs_airline),
                            gam=str(0.5),
                             lam=str(0.0),
                             normalizedinput='airline_standard',
                            previousWeights=previousWeights_name, 
                            newWeights=newWeights_name, 
                            conv=conv_name))
    while(state_job==False):
        state_job=iteration_finished(conv_name)                    
        time.sleep(7)
    else:  
        new_weights_airline_ols=getvalue_fromHDFS(newWeights_name)
        new_convergence_airline_ols=getvalue_fromHDFS(conv_name)
        convergence_airline_ols=np.append(convergence_airline_ols,new_convergence_airline_ols)
        weight_matrix_airline_ols=np.vstack((weight_matrix_airline_ols,new_weights_airline_ols))
        print(str(i)+": "+str(new_weights_airline_ols.round(3))+" convergence: "+str(new_convergence_airline_ols.round(6)))
        previousWeights_name = newWeights_name
        i = i+1

In [83]:
weight_matrix_airline_ols=np.array(range(1,11))
convergence_airline_ols=[]
for i in range(1,41):
    newWeights_name="airline_weights_ols"+str(i)
    conv_name="airline_conv_ols"+str(i)
    new_weights_airline_ols=getvalue_fromHDFS(newWeights_name)
    new_convergence_airline_ols=getvalue_fromHDFS(conv_name)
    weight_matrix_airline_ols=np.vstack((weight_matrix_airline_ols,new_weights_airline_ols))
    convergence_airline_ols=np.append(convergence_airline_ols,new_convergence_airline_ols)

In [84]:
iterate_range=np.arange(0,51)
column_names=["ArrDelay","Year","Month","Age","AirTime","DepDelay","TaxiOut",
            "Distance","ActualElapsedTime","DayofMonth","DayOfWeek"]
data=[]
for i in range(0,10):
    data.append(go.Scatter(x=iterate_range,
                        y=np.append(0,weight_matrix_airline_ols[1:42,i]*standarderrors_airline[0]/standarderrors_airline[i+1]),
                        mode='lines+markers',name=column_names[i+1]))
py.iplot(data, filename='airline_weightconvergence_ols')

In [85]:
py.iplot([go.Bar(x=iterate_range+1,y=convergence_airline_ols[0:41])], filename='airline_convergence_ols')

After our correction of the step size parameter at the 21st iteration, we see that our algorithm converges correctly.

## Conclusion

All in all, we managed to successfully overcome several challenges, but some remain:

- **Successes**
    - Distribute a simple Machine Learning algorithm on Hadoop.
    - Learn the programming language Pig Latin.
    - Manage jobs on a Hadoop cluster. For that we were glad to choose a Cloudera setup, which has the very useful `%job_syntax` command, the ability to check the log `redirection.err`, and the great Hue interface
    - Explore an interesting new dataset
    
    
- **To be improved**
    - Make our algorithm more scalable, as we run into problems when trying with large datasets. We believe the problem lies with our data splitting strategy by column, which obliges us to transform the dataset with Pig (costly and inefficient in terms of computation). A possible solution would be to transform the text file beforehand using Python, with a line-by-line processing to limit memory usage.
    - Optimize the choice of our parameters $\lambda,\gamma$. We could possibly do that using cross-validation, or use dynamic updating during iterations.
    



In [62]:
%remote_close

True