# Tutorial
A. This tutorial will show you how to 
    1. Read data into a spark context
    2. Perform some transformation
    3. Calculate some Statistics
    4. Plot and view results
    5. Build a model to estimate values
    6. Export and save generated data
B. Requirements
    1. Load the tables into a spark context
        - From Data Lake Store
            - D3S_Training_9021_MaxTemp
            - D3S_Training_9106_MaxTemp
            - D3S_Training_9215_MaxTemp
            - D3S_Training_9225_MaxTemp
            - D3S_Training_9265_MaxTemp
        - From SQL Store
            - D3S_Training_WeatherStations
    2. View the top of each of the tables
    3. Find the overlapping date range for the tables in the Data Lake where
        - MaxTemp is not null
        - Quality is equals to 'Y'
    4. Filter each of the tables in the Data Lake where
        - DateTime is between the date range calculated above
        - MaxTemp is not null
        - Quality is equals to 'Y'
    5. Join the filtered tables on date creating a table with the following columns
        - Year
        - Month
        - Day
        - Airport_MaxTemp
        - Gosnells_MaxTemp
        - Swanbourne_MaxTemp
        - Perth_MaxTemp
        - Hillarys_MaxTemp
    6. View the stats on the joined table using the describe method
    7. Calculate a correlation matrix of the MaxTemp column in the joined table
    8. View the correlation matrix as a heat map
    9. Calculate a series of linear regressions where Perth_MaxTemp is the dependent variable/label
        - Features: Airport
        - Features: Airport,Gosnells
        - Features: Airport,Gosnells,Swanbourne
        - Features: Airport,Swanbourne
        - Features: Airport,Gosnells,Swanbourne,Hillarys
    10. Export the joined table into a new delta table in the data lake

## Load required libraries

In [None]:
from neuro_python.neuro_compute import spark_manager as spm
from neuro_python.neuro_data import schema_manager as sm
from plotly.offline import download_plotlyjs, init_notebook_mode, plot, iplot
import plotly.graph_objs as go
init_notebook_mode(connected=True)

## Start the default cluster and ensure it's running

In [None]:
spm.start_cluster()

In [None]:
spm.list_clusters()

## Create context once cluster is in running state

In [None]:
spm.create_context('TrainingContext')

## 1. ImportData
Change the datalake name and sql store name to match yours

In [None]:
%%spark_import_table
import_table('df_9021','DataLakeName','D3S_Training_9021_MaxTemp')
import_table('df_9106','DataLakeName','D3S_Training_9106_MaxTemp')
import_table('df_9215','DataLakeName','D3S_Training_9215_MaxTemp')
import_table('df_9225','DataLakeName','D3S_Training_9225_MaxTemp')
import_table('df_9265','DataLakeName','D3S_Training_9265_MaxTemp')
import_table('df_Stations','SqlStoreName','D3S_Training_WeatherStations')

## 2. View the top of each of the tables

In [None]:
%%spark_sql
select * 
from df_9021
limit 10

In [None]:
%%spark_sql
select * 
from df_9106
limit 10

In [None]:
%%spark_sql
select * 
from df_9215
limit 10

In [None]:
%%spark_sql
select * 
from df_9225
limit 10

In [None]:
%%spark_sql
select * 
from df_9265
limit 10

In [None]:
%%spark_sql
select * 
from df_Stations
limit 10

## 3. Find the overlapping date range for the tables

In [None]:
%%spark
import datetime
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import udf
from pyspark.sql.functions import min, max, first
spark.udf.register('udf_myFunction1',datetime.datetime, TimestampType())
udf_myFunction1 = udf(datetime.datetime, TimestampType())
df2=df_9021.filter("MaxTemp is not null and Quality='Y'").withColumn('Date',udf_myFunction1('Year','Month','Day')).select(min('Date'),max('Date')).limit(1)
df2=df2.union(df_9106.filter("MaxTemp is not null and Quality='Y'").withColumn('Date',udf_myFunction1('Year','Month','Day')).select(min('Date'),max('Date')).limit(1))
df2=df2.union(df_9215.filter("MaxTemp is not null and Quality='Y'").withColumn('Date',udf_myFunction1('Year','Month','Day')).select(min('Date'),max('Date')).limit(1))
df2=df2.union(df_9225.filter("MaxTemp is not null and Quality='Y'").withColumn('Date',udf_myFunction1('Year','Month','Day')).select(min('Date'),max('Date')).limit(1))
df2=df2.union(df_9265.filter("MaxTemp is not null and Quality='Y'").withColumn('Date',udf_myFunction1('Year','Month','Day')).select(min('Date'),max('Date')).limit(1))
df2=df2.select(max('min(Date)'),min('max(Date)'))

In [None]:
%spark_pandas -df df2

## 4. Filter each of the tables

In [None]:
%%spark_sql -df df_9021_filtered
select Year,Month,Day,MaxTemp
from df_9021
where Quality='Y'
and udf_myFunction1(Year,Month,Day)>=udf_myFunction1(2013,5,16)
and udf_myFunction1(Year,Month,Day)<=udf_myFunction1(2016,2,29)
and MaxTemp is not null

In [None]:
%%spark_sql -df df_9106_filtered
select Year,Month,Day,MaxTemp
from df_9106
where Quality='Y'
and udf_myFunction1(Year,Month,Day)>=udf_myFunction1(2013,5,16)
and udf_myFunction1(Year,Month,Day)<=udf_myFunction1(2016,2,29)
and MaxTemp is not null

In [None]:
%%spark_sql -df df_9215_filtered
select Year,Month,Day,MaxTemp
from df_9215
where Quality='Y'
and udf_myFunction1(Year,Month,Day)>=udf_myFunction1(2013,5,16)
and udf_myFunction1(Year,Month,Day)<=udf_myFunction1(2016,2,29)
and MaxTemp is not null

In [None]:
%%spark_sql -df df_9225_filtered
select Year,Month,Day,MaxTemp
from df_9225
where Quality='Y'
and udf_myFunction1(Year,Month,Day)>=udf_myFunction1(2013,5,16)
and udf_myFunction1(Year,Month,Day)<=udf_myFunction1(2016,2,29)
and MaxTemp is not null

In [None]:
%%spark_sql -df df_9265_filtered
select Year,Month,Day,MaxTemp
from df_9265
where Quality='Y'
and udf_myFunction1(Year,Month,Day)>=udf_myFunction1(2013,5,16)
and udf_myFunction1(Year,Month,Day)<=udf_myFunction1(2016,2,29)
and MaxTemp is not null

## 5. Join the filtered tables

In [None]:
%%spark_sql -df df_joined
select air.Year, air.Month, air.Day, air.MaxTemp as Airport_MaxTemp, gos.MaxTemp as Gosnells_MaxTemp, 
    per.MaxTemp as Perth_MaxTemp, hill.MaxTemp as Hillarys_MaxTemp, swan.MaxTemp as Swanbourne_MaxTemp
from df_9021_filtered air
join df_9106_filtered gos on
    gos.year = air.year
    and gos.month = air.month
    and gos.day = air.day
join df_9225_filtered per on
    per.year = air.year
    and per.month = air.month
    and per.day = air.day
join df_9265_filtered hill on
    hill.year = air.year
    and hill.month = air.month
    and hill.day = air.day
join df_9215_filtered swan on
    swan.year = air.year
    and swan.month = air.month
    and swan.day = air.day

## 6. View the stats on the joined table

In [None]:
%spark_pandas -df df_joined.describe()

## 7. Calculate a correlation matrix

In [None]:
%%spark
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
    inputCols=["Airport_MaxTemp","Gosnells_MaxTemp","Swanbourne_MaxTemp","Perth_MaxTemp","Hillarys_MaxTemp"],
    outputCol="features")
output = assembler.transform(df_joined)

In [None]:
%%spark
from pyspark.ml.stat import Correlation
r1 = Correlation.corr(output, "features").head()
rows = r1[0].toArray().tolist()
df_corr = spark.createDataFrame(rows,["Airport_MaxTemp","Gosnells_MaxTemp","Swanbourne_MaxTemp","Perth_MaxTemp","Hillarys_MaxTemp"])

### Return the result into the notebook so you can plot it

In [None]:
%spark_pandas -df df_corr -o df_corr

## 8. View the correlation matrix as a heat map

In [None]:
iplot([go.Heatmap(z=df_corr.values.tolist(),
                   x=["Airport_MaxTemp","Gosnells_MaxTemp","Swanbourne_MaxTemp","Perth_MaxTemp","Hillarys_MaxTemp"],
                   y=["Airport_MaxTemp","Gosnells_MaxTemp","Swanbourne_MaxTemp","Perth_MaxTemp","Hillarys_MaxTemp"])])

## 9. Calculate a series of linear regressions

In [None]:
%%spark
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(labelCol='Perth_MaxTemp')
assembler = VectorAssembler(
    inputCols=["Airport_MaxTemp"],
    outputCol="features")
output = assembler.transform(df_joined)
# Fit the model
lrModel = lr.fit(output)

# Print the coefficients and intercept for linear regression
"Coefficients: %s" % str(lrModel.coefficients)+" Intercept: %s" % str(lrModel.intercept)+" R-Squared: %s"%lrModel.summary.r2+" RMSE: %s"%lrModel.summary.rootMeanSquaredError

#### This code won't work, it is an explaination of how to apply the linear regression results
perth_est=airport*0.957775304263+0.8193198067865914

In [None]:
%%spark
lr = LinearRegression(labelCol='Perth_MaxTemp')
assembler = VectorAssembler(
    inputCols=["Airport_MaxTemp","Gosnells_MaxTemp"],
    outputCol="features")
output = assembler.transform(df_joined)
# Fit the model
lrModel = lr.fit(output)

# Print the coefficients and intercept for linear regression
"Coefficients: %s" % str(lrModel.coefficients)+" Intercept: %s" % str(lrModel.intercept)+" R-Squared: %s"%lrModel.summary.r2+" RMSE: %s"%lrModel.summary.rootMeanSquaredError

In [None]:
%%spark
lr = LinearRegression(labelCol='Perth_MaxTemp')
assembler = VectorAssembler(
    inputCols=["Airport_MaxTemp","Gosnells_MaxTemp","Swanbourne_MaxTemp"],
    outputCol="features")
output = assembler.transform(df_joined)
# Fit the model
lrModel = lr.fit(output)

# Print the coefficients and intercept for linear regression
"Coefficients: %s" % str(lrModel.coefficients)+" Intercept: %s" % str(lrModel.intercept)+" R-Squared: %s"%lrModel.summary.r2+" RMSE: %s"%lrModel.summary.rootMeanSquaredError

In [None]:
%%spark
lr = LinearRegression(labelCol='Perth_MaxTemp')
assembler = VectorAssembler(
    inputCols=["Airport_MaxTemp","Swanbourne_MaxTemp"],
    outputCol="features")
output = assembler.transform(df_joined)
# Fit the model
lrModel = lr.fit(output)

# Print the coefficients and intercept for linear regression
"Coefficients: %s" % str(lrModel.coefficients)+" Intercept: %s" % str(lrModel.intercept)+" R-Squared: %s"%lrModel.summary.r2+" RMSE: %s"%lrModel.summary.rootMeanSquaredError

In [None]:
%%spark
lr = LinearRegression(labelCol='Perth_MaxTemp')
assembler = VectorAssembler(
    inputCols=["Airport_MaxTemp","Gosnells_MaxTemp","Swanbourne_MaxTemp","Hillarys_MaxTemp"],
    outputCol="features")
output = assembler.transform(df_joined)
# Fit the model
lrModel = lr.fit(output)

# Print the coefficients and intercept for linear regression
"Coefficients: %s" % str(lrModel.coefficients)+" Intercept: %s" % str(lrModel.intercept)+" R-Squared: %s"%lrModel.summary.r2+" RMSE: %s"%lrModel.summary.rootMeanSquaredError

## 10. Export the joined table into a new delta table in the data lake
Change the data lake name and table name to match yours

In [None]:
cols=[sm.column_definition('Year','Int'),
     sm.column_definition('Month','Int'),
     sm.column_definition('Day','Int'),
     sm.column_definition('Airport_MaxTemp','Double'),
     sm.column_definition('Gosnells_MaxTemp','Double'),
     sm.column_definition('Swanbourne_MaxTemp','Double'),
     sm.column_definition('Perth_MaxTemp','Double'),
     sm.column_definition('Hillarys_MaxTemp','Double')]
table_def=sm.table_definition(cols,'Processed',file_type='delta')
sm.create_table('DataLakeName','D3S_Training_Lee_PerthMaxTemps',table_def)

In [None]:
%%spark_export_table
export_table('df_joined','DataLakeName','D3S_Training_Lee_PerthMaxTemps')

### Test that the data exported correctly

In [None]:
%%spark_import_table
import_table('testdata','DataLakeName','D3S_Training_Lee_PerthMaxTemps')

In [None]:
%%spark_sql
select *
from testdata
limit 10