# Linear Regression

**Overview**   
We will implement a simple linear regression in BigDL

**Run time**  
15 mins

## Step 1 - Init

In [None]:
from zoo.common.nncontext import init_nncontext
import zoo.version

## TODO : use 'init_nncontext'
sc = ???("Linear Regression")
print("zoo version : ", zoo.version.__version__)

## Spark UI
print('Spark UI running on http://localhost:' + sc.uiWebUrl.split(':')[2])
sc

## Step 2 - Create some sample data

In [None]:
import numpy as np
import pandas as pd

bill = np.array([50.00, 30.00, 60.00, 40.00, 65.00, 20.00, 10.00, 15.00, 25.00, 35.00])
tip = np.array( [12.00, 7.00, 13.00, 8.00, 15.00, 5.00, 2.00, 2.00, 3.00, 4.00])

tip_data = pd.DataFrame({'bill' : bill,
              'tip' : tip
             })

tip_data

## Step 3 - Basic plot

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt

plt.scatter(bill, tip)
plt.ylabel('tip')
plt.xlabel('bill')
plt.show()

## Step 4 - Create a Spark dataframe

In [None]:
## TODO : Create a Spark Dataframe
## Hint : pass in 'tips_data'
spark_tips = spark.createDataFrame(???)

spark_tips.printSchema()
spark_tips.show(100)

## Step 5 - Create a Feature Vector

In [None]:
from pyspark.ml.feature import VectorAssembler

## TODO : Create feature vector for input 'bill'
## Hint : inputCols = ['bill']
assembler1 = VectorAssembler(inputCols=[???], outputCol="assembled")
feature_vector = assembler1.transform(spark_tips)

## TODO : create a feature vector for tip column
## Hint : inputCols = ['tip']
assembler2 = VectorAssembler(inputCols=["???"], outputCol="outcome")
feature_vector = assembler2.transform(feature_vector)

feature_vector.printSchema()
feature_vector.show()

## Step 6 - Create Array type
NNClassifier currently supports Array type.  
(Spark ML Vector support coming soon)

In [None]:
# Add Utils dir to load path

import os
import sys
cwd = os.getcwd()
# print ("cwd : ", cwd)
utils_dir = os.path.abspath(os.path.join(cwd, "../utils"))
# print("utils dir : ", utils_dir)
if utils_dir not in sys.path:
    sys.path.append(utils_dir)
print ("sys.path: " , sys.path)

my_utils_pyfile = os.path.abspath(os.path.join(utils_dir, 'my_utils.py'))
print ("my_utils file : ", my_utils_pyfile)

from my_utils import dense_to_array_udf, sparse_to_array_udf

# add file to spark
sc.addPyFile(my_utils_pyfile)

In [None]:
## TODO : convert 'assembled' into 'features'
feature_vector = feature_vector.withColumn('features', dense_to_array_udf('???'))

## TODO : convert 'outcome' into 'label'
feature_vector = feature_vector.withColumn('label', dense_to_array_udf('???'))

feature_vector.printSchema()
feature_vector.show()

## Step 7 - Create Linear Layer

### 7.1 - Network parameters

In [None]:
## TODO : set network parameters

## Hint : set both 'input_size' and 'output_size'  to 1

## batch size : set to power of 2, but it can not be higher than the total number of inputs
## start with 4

## max_epoch : set to 100

## Learning rate : set to 0.001

input_size = ???
output_size = ???
batch_size = ??
max_epochs = ???

learning_rate = ???

### 7.2 - Setup network

In [None]:
from bigdl.nn.layer import Linear, Sequential
from bigdl.nn.criterion import MSECriterion
from zoo.pipeline.nnframes import  NNEstimator
from zoo.pipeline.nnframes.nn_classifier import SeqToTensor, ArrayToTensor



## TODO : create a simple linear network, just input to output
## Hint : input_size=input_size,   output_size=output_size (they are both 1)
nn = Sequential()\
    .add(Linear(input_size=???, output_size=???))

# MSE : https://en.wikipedia.org/wiki/Mean_squared_error
## TODO : use 'MSECriterion'
criterion = ???()

## TODO : Create NNEstimator with correct parameters
## Hint : 
##     first param : nn (network)
##     second param : criterion
##     third param : input size 
##     fourth param : output size 

estimator = ???(???, ???, SeqToTensor([???]), ArrayToTensor([???]))

#TODO : set other params
estimator.setBatchSize(???)\
         .setMaxEpoch(???)\
         .setLearningRate(???)

print("nn:\n", nn)


## Step 5 : Train

In [None]:
%%time

print("training started...")
## TODO : train using 'training' data
## Hint : pass in 'feature_vector'
model = estimator.???(???)
print("training done\n")

## TODO : Notice the time taken

## Step 6 : Predict

In [None]:
## TODO : Create predictions by using 'transform' method
##        Pass in 'feature_vector' as parameter

predictions = model.???(???)
predictions.show()

## Step 7 : Evaluation

### 7.1 - Calculate residuals

In [None]:
residuals = predictions.withColumn('residual', predictions['label'][0] - predictions['prediction'][0])
residuals.show()

### 7.2 - SSE

In [None]:
from pyspark.sql.functions import sum

residual_squared = residuals.withColumn("residual_squared", residuals['residual'] * residuals['residual'])
residual_squared.show()


In [None]:
sse = residual_squared.agg(sum('residual_squared')).first()[0]
print ("SSE = " , sse)