# Modeling with Data Streams
brian higginbotham

For this project I will explore reading, summarizing, and modeling with data streams with the goal of writing the results of these operations to the console.

The data is from the [UCI Machine Learning Repository](https://archive.ics.uci.edu/dataset/849/power+consumption+of+tetouan+city). It is power usage data from the Tunisian city of Tetouan, measuring climate conditions as well as the power output in three zones. Measurements were taken every 10 minutes from January 1 to December 30, 2017.

In the Modeling section, we will use the third power zone as our target and all other columns will be used as our predictiors or features.

We'll first import some common libraries to get us started on reading and summarizing the data. We'll add additional libraries as needed.

In [2]:
import pandas as pd
import numpy as np
import pyspark.pandas as ps
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Summaries

Import the data as a ```pandas``` dataframe and then save as a *pandas-on-spark* dataframe. We'll use the pandas functionality to get a few summaries such as mean, median, standard deviation and even some correlations.

In [3]:
power_pd = pd.read_csv('power_ml_data.csv')

In [5]:
power_ps = ps.from_pandas(power_pd)
power_ps.head()

Unnamed: 0,Temperature,Humidity,Wind_Speed,General_Diffuse_Flows,Diffuse_Flows,Power_Zone_1,Power_Zone_2,Power_Zone_3,Month,Hour
0,6.559,73.8,0.083,0.051,0.119,34055.6962,16128.87538,20240.96386,1,0
1,6.414,74.5,0.083,0.07,0.085,29814.68354,19375.07599,20131.08434,1,0
2,6.313,74.5,0.08,0.062,0.1,29128.10127,19006.68693,19668.43373,1,0
3,6.121,75.0,0.083,0.091,0.096,28228.86076,18361.09422,18899.27711,1,0
4,5.921,75.7,0.081,0.048,0.085,27335.6962,17872.34043,18442.40964,1,0


Use the ```.describe()``` method to get basic summaries for each column. We'll exlude the first row since it returns the count for each row, which is just the lenght of our dataset - about 47,000.

In [7]:
power_ps[['Temperature', 'Humidity', 'Wind_Speed', 'General_Diffuse_Flows', 'Diffuse_Flows', \
          'Power_Zone_1', 'Power_Zone_2', 'Power_Zone_3']].describe()[1:]

                                                                                

Unnamed: 0,Temperature,Humidity,Wind_Speed,General_Diffuse_Flows,Diffuse_Flows,Power_Zone_1,Power_Zone_2,Power_Zone_3
mean,18.81322,68.288398,1.961621,182.53118,74.987211,32335.16869,21027.204976,17831.197608
std,5.813341,15.56033,2.349351,264.431856,124.256146,7130.013305,5199.787153,6622.59047
min,3.247,11.34,0.05,0.004,0.011,13895.6962,8560.081466,5935.17407
25%,14.42,58.32,0.078,0.062,0.122,26290.63291,16957.31707,13121.92771
50%,18.78,69.89,0.086,4.78,4.284,32261.59696,20804.86322,16405.28211
75%,22.91,81.5,4.915,319.0,101.0,37317.44681,24698.73418,21628.91566
max,40.01,94.8,6.483,1163.0,936.0,52146.85905,37408.86076,47598.32636


'Power_Zone_3', our target, has the lowest average of the three zones but is in the middle for standard deviations.

Next, let's see if there is any correlation between the power zones and the other features. Utilize ```iloc()``` to only return the portion of the dataframe that we're interested in.

In [8]:
power_ps[['Temperature', 'Humidity', 'Wind_Speed', 'Month', 'Hour', 'General_Diffuse_Flows', 'Diffuse_Flows', \
          'Power_Zone_1', 'Power_Zone_2', 'Power_Zone_3']].corr().iloc[:7,7:10]

                                                                                

Unnamed: 0,Power_Zone_1,Power_Zone_2,Power_Zone_3
Temperature,0.441446,0.384301,0.490752
Humidity,-0.28909,-0.297019,-0.234228
Wind_Speed,0.166322,0.146338,0.279112
Month,-0.006429,0.318368,-0.232978
Hour,0.728118,0.663755,0.454806
General_Diffuse_Flows,0.189994,0.158798,0.064942
Diffuse_Flows,0.082885,0.047379,-0.036761


'Hour' has the strongest correlation with 'Power_Zone_1' while 'Power_Zone_3' has the strongest correlation with 'Temperature.'

For the remainder of the summaries we'll convert the 'pandas-on-spark' dataframe to a 'spark-sql' dataframe. This way, we can utilize **SQL** language along with spark methods to produce more computationally intensive summaries.

We'll also need to use 'spark-sql' for the model fitting and streaming.

In [10]:
spark = SparkSession.builder.getOrCreate()
power_spk = power_ps.to_spark()
power_spk.show(5)

+-----------+--------+----------+---------------------+-------------+------------+------------+------------+-----+----+
|Temperature|Humidity|Wind_Speed|General_Diffuse_Flows|Diffuse_Flows|Power_Zone_1|Power_Zone_2|Power_Zone_3|Month|Hour|
+-----------+--------+----------+---------------------+-------------+------------+------------+------------+-----+----+
|      6.559|    73.8|     0.083|                0.051|        0.119|  34055.6962| 16128.87538| 20240.96386|    1|   0|
|      6.414|    74.5|     0.083|                 0.07|        0.085| 29814.68354| 19375.07599| 20131.08434|    1|   0|
|      6.313|    74.5|      0.08|                0.062|          0.1| 29128.10127| 19006.68693| 19668.43373|    1|   0|
|      6.121|    75.0|     0.083|                0.091|        0.096| 28228.86076| 18361.09422| 18899.27711|    1|   0|
|      5.921|    75.7|     0.081|                0.048|        0.085|  27335.6962| 17872.34043| 18442.40964|    1|   0|
+-----------+--------+----------+-------

Here, we'll pull up a one-way contingency table, which is basically a count, of the Month variable. In order to get the output to be more readable, let's replace the month number with the month name. Also, for a one-way contingency table we'll need to create a dummy column with a constant value. We can do this at the end of our code using the ```.withColumn()``` method. To create the dummy variable, we'll just multiply each value in the 'Hour' column by zero and add one, creating a column of ones.

In [12]:
power_spk.withColumn('Month_cat', when(power_spk.Month == 1, 'Jan').when(power_spk.Month == 2, 'Feb')\
.when(power_spk.Month == 3, 'March').when(power_spk.Month == 4, 'April').when(power_spk.Month == 5, 'May')\
.when(power_spk.Month == 6, 'June').when(power_spk.Month == 7, 'July').when(power_spk.Month == 8, 'Aug')\
.when(power_spk.Month == 9, 'Sep').when(power_spk.Month == 10, 'Oct').when(power_spk.Month == 11, 'Nov')\
.when(power_spk.Month == 12, 'Dec')).withColumn('dummy', col('Hour')*0 +1).crosstab(col1='Month_cat', col2='dummy')\
.show(12)

+---------------+----+
|Month_cat_dummy|   1|
+---------------+----+
|           July|4029|
|            Oct|4026|
|            Sep|3913|
|            Dec|3868|
|          March|4057|
|            Aug|3999|
|            May|3997|
|          April|3893|
|           June|3913|
|            Feb|3588|
|            Nov|3877|
|            Jan|4014|
+---------------+----+



The 'Month' category looks pretty evenly distributed with about 4,000 entries for each month.

Now let's do the same for the 'Hour' column.

In [14]:
power_spk.withColumn('dummy', col('Hour')*0 +1).crosstab(col1='Hour', col2='dummy').show(12)

+----------+----+
|Hour_dummy|   1|
+----------+----+
|         7|1964|
|        15|1947|
|        11|1972|
|         3|1966|
|         8|1957|
|        22|1966|
|        16|1950|
|         0|1950|
|         5|1968|
|        18|1955|
|        17|1979|
|         6|1992|
+----------+----+
only showing top 12 rows



We've limited the output here to 12, but we get a good sense that the 'Hour' category is also pretty evenly distributed with about 1950 entries for each hour.

Now let's produce a two-way contingency table to look at the combination of 'Hour' and 'Month' columns. Again, we'll rename the 'Month' entries for easier readability.

In [16]:
power_spk.withColumn('Month_cat', when(power_spk.Month == 1, 'Jan').when(power_spk.Month == 2, 'Feb')\
.when(power_spk.Month == 3, 'March').when(power_spk.Month == 4, 'April').when(power_spk.Month == 5, 'May')\
.when(power_spk.Month == 6, 'June').when(power_spk.Month == 7, 'July').when(power_spk.Month == 8, 'Aug')\
.when(power_spk.Month == 9, 'Sep').when(power_spk.Month == 10, 'Oct').when(power_spk.Month == 11, 'Nov')\
.when(power_spk.Month == 12, 'Dec')).withColumn('dummy', col('Hour')*0 +1).crosstab(col1='Month_cat', col2='Hour')\
.show()

+--------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|Month_cat_Hour|  0|  1| 10| 11| 12| 13| 14| 15| 16| 17| 18| 19|  2| 20| 21| 22| 23|  3|  4|  5|  6|  7|  8|  9|
+--------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|          July|172|169|167|168|166|173|169|162|164|172|176|172|161|159|163|173|167|170|167|174|168|164|165|168|
|           Oct|171|168|163|171|166|161|175|165|159|176|165|162|175|170|173|169|167|172|162|163|170|164|168|171|
|           Sep|164|161|159|161|174|159|162|168|166|169|162|163|160|158|163|163|167|164|167|158|162|159|162|162|
|           Dec|154|159|165|167|164|156|168|157|158|153|159|160|167|158|159|168|156|167|164|164|165|165|156|159|
|         March|174|171|175|168|166|173|165|171|166|163|165|170|167|168|175|165|172|168|166|170|173|170|166|170|
|           Aug|168|169|162|171|167|167|164|166|168|165|169|168|171|165|169|164|173|165|168|171|

It looks like our data is pretty evenly distributed between by 'Month' and 'Hour'.

Now let's take a look at the monthly averages for all of our numeric variables. We can do this pretty easily with the ```.groupby()``` and ```.avg()``` methods. For easier readability, we'll run the resultant columns of the data frame through the ```round()``` function.

In [18]:
mnth_avg = power_spk.select(['Temperature', 'Humidity', 'Wind_Speed', 'General_Diffuse_Flows', 'Diffuse_Flows', \
          'Power_Zone_1', 'Power_Zone_2', 'Power_Zone_3', 'Month']).groupby('Month').avg()
mnth_avg.select(*[round(c,3).alias(c) for c in mnth_avg.columns]).show()

+-----+----------------+-------------+---------------+--------------------------+------------------+-----------------+-----------------+-----------------+----------+
|Month|avg(Temperature)|avg(Humidity)|avg(Wind_Speed)|avg(General_Diffuse_Flows)|avg(Diffuse_Flows)|avg(Power_Zone_1)|avg(Power_Zone_2)|avg(Power_Zone_3)|avg(Month)|
+-----+----------------+-------------+---------------+--------------------------+------------------+-----------------+-----------------+-----------------+----------+
|    1|          12.735|       68.259|          0.702|                    103.96|            69.799|        31052.984|        19407.916|        17736.352|       1.0|
|    2|          12.657|       66.491|          1.114|                   125.471|            92.331|        30973.863|        18774.586|        17309.708|       2.0|
|    3|          14.584|       71.116|          1.006|                   181.402|            93.156|        31162.869|        18459.612|        16945.463|       3.0|
|   

Next, we'll take a look at the standard deviation for each of the numerica variables. However, there is no convenient ```.std()``` method like we did with ```.avg()```. So we will utilize some basic **SQL** coding along with the ```.agg()``` method. Again, for readability, we'll use ```round()``` as well as ```.alias()``` to rename the resultant columns. 

In [19]:
power_spk.groupby('Month')\
.agg(round(std('Temperature'),3).alias('temp_std'), round(std('Humidity'),3).alias('humidity_std'),
round(std('Wind_Speed'),3).alias('wind_std'), round(std('General_Diffuse_Flows'),3).alias('gen_dif_flow_std'),
round(std('Diffuse_Flows'),3).alias('diff_flows_std'), round(std('Power_Zone_1'),3).alias('zone_1_std'),
round(std('Power_Zone_2'),3).alias('zone_2_std'),round(std('Power_Zone_3'),3).alias('zone_3_std')).show()

+-----+--------+------------+--------+----------------+--------------+----------+----------+----------+
|Month|temp_std|humidity_std|wind_std|gen_dif_flow_std|diff_flows_std|zone_1_std|zone_2_std|zone_3_std|
+-----+--------+------------+--------+----------------+--------------+----------+----------+----------+
|    1|   3.241|      12.156|   1.612|         166.165|       131.459|  7402.323|  4515.296|  4436.997|
|    2|    2.62|      12.412|   1.981|          206.73|       169.156|  6874.585|  4390.391|  4353.976|
|    3|   3.759|      13.918|   1.901|         260.149|       151.168|  6782.137|  4185.118|  4256.766|
|    4|   2.806|      14.313|    0.82|         246.174|       123.912|    6496.7|  3835.629|  4556.263|
|    5|     3.3|      16.436|   2.408|         331.999|       171.586|  6809.333|  4182.544|  4353.394|
|    6|    2.69|      14.973|   2.235|         328.277|       143.498|  7317.808|  4465.664|  5596.703|
|    7|   3.857|       18.85|   1.111|         331.734|        9

'Power_Zone_1' has a much higher standard deviation than the other two power zones.

# Modeling

Now we'll need to prep the data for modeling. What we're going to do here is set up some transformations that will prepare the data set to be fitted to a model and then we will use that model to make some predictions. We can then compare those predictions to the actual results and measure how well our model performs.

First off, let's look at our data formats to make sure it can be read correctly.

In [20]:
power_spk.printSchema()

root
 |-- Temperature: double (nullable = false)
 |-- Humidity: double (nullable = false)
 |-- Wind_Speed: double (nullable = false)
 |-- General_Diffuse_Flows: double (nullable = false)
 |-- Diffuse_Flows: double (nullable = false)
 |-- Power_Zone_1: double (nullable = false)
 |-- Power_Zone_2: double (nullable = false)
 |-- Power_Zone_3: double (nullable = false)
 |-- Month: long (nullable = false)
 |-- Hour: long (nullable = false)



In order to run the appropriate transformations on the 'Hour' column, we'll need to change it from *long* format to *double* format. We can use the ```.withColumn()``` method for this - it creates a new column by performing some action on an existing column. If we use the same column name for the new column as the old column, we essentially write over the old column with the new column.

In [21]:
power_spk = power_spk.withColumn("Hour",  
                                  power_spk["Hour"] 
                                  .cast('double')) 
power_spk.printSchema()

root
 |-- Temperature: double (nullable = false)
 |-- Humidity: double (nullable = false)
 |-- Wind_Speed: double (nullable = false)
 |-- General_Diffuse_Flows: double (nullable = false)
 |-- Diffuse_Flows: double (nullable = false)
 |-- Power_Zone_1: double (nullable = false)
 |-- Power_Zone_2: double (nullable = false)
 |-- Power_Zone_3: double (nullable = false)
 |-- Month: long (nullable = false)
 |-- Hour: double (nullable = false)



Now we'll start our pipeline of transformations. Keep in mind that the goal for these transformations is get the data into a format so that pySpark can fit a model. To that end, we will need a **'label'** column that contains our target data ('Power_Zone_3') and a **'features'** column that will containi a vector of all the features to be used in fitting the model.

Let's go ahead and import some modules to help us with the transformations.

In [22]:
from pyspark.ml.feature import SQLTransformer, StringIndexer, Binarizer, VectorAssembler, \
VectorIndexer, OneHotEncoder, PCA

We'll start by creating a binary column distinguishing Day and Night. This will be based on the 'Hour' column. Using ```Binarizer()```, we can set a threshold at 6.5 which will assign the int **'0'** to all values below 6.5 and the int **'1'** to all values above.

In [23]:
binary_HTrans = Binarizer(threshold = 6.5, inputCol='Hour', outputCol='Night_Day')
binary_HTrans.transform(power_spk).show(5)

+-----------+--------+----------+---------------------+-------------+------------+------------+------------+-----+----+---------+
|Temperature|Humidity|Wind_Speed|General_Diffuse_Flows|Diffuse_Flows|Power_Zone_1|Power_Zone_2|Power_Zone_3|Month|Hour|Night_Day|
+-----------+--------+----------+---------------------+-------------+------------+------------+------------+-----+----+---------+
|      6.559|    73.8|     0.083|                0.051|        0.119|  34055.6962| 16128.87538| 20240.96386|    1| 0.0|      0.0|
|      6.414|    74.5|     0.083|                 0.07|        0.085| 29814.68354| 19375.07599| 20131.08434|    1| 0.0|      0.0|
|      6.313|    74.5|      0.08|                0.062|          0.1| 29128.10127| 19006.68693| 19668.43373|    1| 0.0|      0.0|
|      6.121|    75.0|     0.083|                0.091|        0.096| 28228.86076| 18361.09422| 18899.27711|    1| 0.0|      0.0|
|      5.921|    75.7|     0.081|                0.048|        0.085|  27335.6962| 17872.3

We'd also like to use the 'Month' column as a categorical variable for our model. We could create dummy variables for each month, but since there are 12 our resultant data set would almost double in size. In order to keep the size and the resultant computational expense down, we use ```OneHotEncoder()``` on the 'Month' column. The result will be a vector that will be read/interpretted the same way a dummy variable would be, but contained within one column.

In [24]:
encoder = OneHotEncoder(inputCol="Month", outputCol="Month_OHE")
encoder_Trans = encoder.fit(power_spk)
encoder_Trans.transform(binary_HTrans.transform(power_spk)).show(5)

                                                                                

+-----------+--------+----------+---------------------+-------------+------------+------------+------------+-----+----+---------+--------------+
|Temperature|Humidity|Wind_Speed|General_Diffuse_Flows|Diffuse_Flows|Power_Zone_1|Power_Zone_2|Power_Zone_3|Month|Hour|Night_Day|     Month_OHE|
+-----------+--------+----------+---------------------+-------------+------------+------------+------------+-----+----+---------+--------------+
|      6.559|    73.8|     0.083|                0.051|        0.119|  34055.6962| 16128.87538| 20240.96386|    1| 0.0|      0.0|(12,[1],[1.0])|
|      6.414|    74.5|     0.083|                 0.07|        0.085| 29814.68354| 19375.07599| 20131.08434|    1| 0.0|      0.0|(12,[1],[1.0])|
|      6.313|    74.5|      0.08|                0.062|          0.1| 29128.10127| 19006.68693| 19668.43373|    1| 0.0|      0.0|(12,[1],[1.0])|
|      6.121|    75.0|     0.083|                0.091|        0.096| 28228.86076| 18361.09422| 18899.27711|    1| 0.0|      0.0|(

Next, we'll perform a **Principal Component Analysis** on the reamaining columns, excepting the 'Power_Zone' columns. **PCA** takes the input columns and analyzes each row to produce a reduced version of the data - that's what is referred to as the 'principal components.' **PCA** is helpful in understanding the relationship between features when a data set has a large number of features. It also helps computationally by reducing the number of features while retaining most of the information about the relationship between those features.

To perform **PCA** on our selected columns, we'll need to assmble the data in those columns into a vector. For this, we'll use ```VectorAssembler()``` and then create a new data object with the transformations made thus far.

In [25]:
assembler_ = VectorAssembler(inputCols=['Temperature', 'Humidity', 'Wind_Speed', 'General_Diffuse_Flows', 'Diffuse_Flows'], 
                             outputCol='features', handleInvalid='keep')
power_featcol = assembler_.transform(encoder_Trans.transform(binary_HTrans.transform(power_spk)))
power_featcol.show(5)

+-----------+--------+----------+---------------------+-------------+------------+------------+------------+-----+----+---------+--------------+--------------------+
|Temperature|Humidity|Wind_Speed|General_Diffuse_Flows|Diffuse_Flows|Power_Zone_1|Power_Zone_2|Power_Zone_3|Month|Hour|Night_Day|     Month_OHE|            features|
+-----------+--------+----------+---------------------+-------------+------------+------------+------------+-----+----+---------+--------------+--------------------+
|      6.559|    73.8|     0.083|                0.051|        0.119|  34055.6962| 16128.87538| 20240.96386|    1| 0.0|      0.0|(12,[1],[1.0])|[6.559,73.8,0.083...|
|      6.414|    74.5|     0.083|                 0.07|        0.085| 29814.68354| 19375.07599| 20131.08434|    1| 0.0|      0.0|(12,[1],[1.0])|[6.414,74.5,0.083...|
|      6.313|    74.5|      0.08|                0.062|          0.1| 29128.10127| 19006.68693| 19668.43373|    1| 0.0|      0.0|(12,[1],[1.0])|[6.313,74.5,0.08,...|
|   

We'll set up a PCA object that we can fit the transformation on. Then we can fit the PCA Transformation to the new data object (since it has the appropriate 'features' column). But once the 'pca_Trans' is fit, we'll no longer need the new data object - we can then use 'pca_Trans' in our original pipeline.

In [26]:
pca = PCA(k=3, inputCol='features', outputCol="pca")
pca_Trans = pca.fit(power_featcol)
pca_Trans.transform(assembler_.transform(encoder_Trans.transform(binary_HTrans.transform(power_spk)))).show(5)

                                                                                

+-----------+--------+----------+---------------------+-------------+------------+------------+------------+-----+----+---------+--------------+--------------------+--------------------+
|Temperature|Humidity|Wind_Speed|General_Diffuse_Flows|Diffuse_Flows|Power_Zone_1|Power_Zone_2|Power_Zone_3|Month|Hour|Night_Day|     Month_OHE|            features|                 pca|
+-----------+--------+----------+---------------------+-------------+------------+------------+------------+-----+----+---------+--------------+--------------------+--------------------+
|      6.559|    73.8|     0.083|                0.051|        0.119|  34055.6962| 16128.87538| 20240.96386|    1| 0.0|      0.0|(12,[1],[1.0])|[6.559,73.8,0.083...|[1.79440486365695...|
|      6.414|    74.5|     0.083|                 0.07|        0.085| 29814.68354| 19375.07599| 20131.08434|    1| 0.0|      0.0|(12,[1],[1.0])|[6.414,74.5,0.083...|[1.80604083009823...|
|      6.313|    74.5|      0.08|                0.062|          

Now we can use ```SQLTransformer()``` to select only the columns we'll need for the model fit - 'Power_Zone_1', 'Power_Zone_2', the binary 'Night_Day' column, the one-hot-encoded 'Month' column', the 'pca' column, and the target column 'Power_Zone_3', which has been renamed as 'label.'

In [27]:
sqlTrans_ = SQLTransformer(statement = '''SELECT Power_Zone_1, Power_Zone_2, Power_Zone_3 as label, Night_Day, \
Month_OHE, pca FROM __THIS__''')
sqlTrans_.transform(pca_Trans.transform(assembler_.transform(encoder_Trans.transform\
                (binary_HTrans.transform(power_spk))))).show(10)

+------------+------------+-----------+---------+--------------+--------------------+
|Power_Zone_1|Power_Zone_2|      label|Night_Day|     Month_OHE|                 pca|
+------------+------------+-----------+---------+--------------+--------------------+
|  34055.6962| 16128.87538|20240.96386|      0.0|(12,[1],[1.0])|[1.79440486365695...|
| 29814.68354| 19375.07599|20131.08434|      0.0|(12,[1],[1.0])|[1.80604083009823...|
| 29128.10127| 19006.68693|19668.43373|      0.0|(12,[1],[1.0])|[1.81022976305639...|
| 28228.86076| 18361.09422|18899.27711|      0.0|(12,[1],[1.0])|[1.79866765174088...|
|  27335.6962| 17872.34043|18442.40964|      0.0|(12,[1],[1.0])|[1.86328720163797...|
| 26624.81013| 17416.41337|18130.12048|      0.0|(12,[1],[1.0])|[1.87820674500461...|
| 25998.98734| 16993.31307|17945.06024|      0.0|(12,[1],[1.0])|[1.91529298717955...|
| 25446.07595| 16661.39818|17459.27711|      0.0|(12,[1],[1.0])|[1.92400540807029...|
| 24777.72152| 16227.35562|17025.54217|      0.0|(12,[

Remember, the two columns pySpark needed to run a model fit are 'label' and 'features'. So we'll need to use 'VectorAssembler()' again to combine the feature columns into one column as a vector.

In [28]:
assembler_2 = VectorAssembler(inputCols=['Power_Zone_1', 'Power_Zone_2', 'Night_Day', 'Month_OHE', 'pca'], 
                             outputCol='features', handleInvalid='keep')
assembler_2.transform(sqlTrans_.transform(pca_Trans.transform(assembler_.transform(encoder_Trans.\
                                transform(binary_HTrans.transform(power_spk)))))).show(10)

+------------+------------+-----------+---------+--------------+--------------------+--------------------+
|Power_Zone_1|Power_Zone_2|      label|Night_Day|     Month_OHE|                 pca|            features|
+------------+------------+-----------+---------+--------------+--------------------+--------------------+
|  34055.6962| 16128.87538|20240.96386|      0.0|(12,[1],[1.0])|[1.79440486365695...|(18,[0,1,4,15,16,...|
| 29814.68354| 19375.07599|20131.08434|      0.0|(12,[1],[1.0])|[1.80604083009823...|(18,[0,1,4,15,16,...|
| 29128.10127| 19006.68693|19668.43373|      0.0|(12,[1],[1.0])|[1.81022976305639...|(18,[0,1,4,15,16,...|
| 28228.86076| 18361.09422|18899.27711|      0.0|(12,[1],[1.0])|[1.79866765174088...|(18,[0,1,4,15,16,...|
|  27335.6962| 17872.34043|18442.40964|      0.0|(12,[1],[1.0])|[1.86328720163797...|(18,[0,1,4,15,16,...|
| 26624.81013| 17416.41337|18130.12048|      0.0|(12,[1],[1.0])|[1.87820674500461...|(18,[0,1,4,15,16,...|
| 25998.98734| 16993.31307|17945.0602

And now we're all set to fit the model!

In [29]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline

Set up the model instance - 'lr'. Set up a list of parameters in ```ParamGridBuilder()``` that ```CrossValidator()``` will iterate through to determine best fit. Since we are fitting an **ElasticNet** model we'll set up a list of 'regParams' that will determine the 'weight' of the penalty and a list of 'elasticNetParms' that will determine the ratio of the LASSO/Ridge models used in the ElasticNet model. We'll also create our pipeline instance that will run all the transormations that we set up above. We'll use the default **'rmse'** as the measurement of model performance in ```RegressionEvaluator()```

In [30]:
lr = LinearRegression()
paramGrid_lr = ParamGridBuilder().addGrid(lr.regParam,[0, 0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.98, 0.99, 1])\
.addGrid(lr.elasticNetParam,[0, 0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.98, 0.99, 1])\
.build()
pipeline_lr=Pipeline(stages=[binary_HTrans, encoder_Trans, assembler_, pca_Trans, sqlTrans_, assembler_2, lr])
crossval_lr=CrossValidator(estimator=pipeline_lr,
                        estimatorParamMaps=paramGrid_lr,
                        evaluator=RegressionEvaluator(),
                        numFolds=3)

Now we are ready to fit our model!

In [None]:
cv_lr_Model=crossval_lr.fit(power_spk)

If your interested in viewing the cross validation results for the parameters, you can use the code below.

In [1]:
#my_list = []
#for i in range(len(paramGrid_lr)):
#    my_list.append([cv_lr_Model.avgMetrics[i], paramGrid_lr[i].values()])

Now we can use our fitted model to transform the same data set to calculate predictions. We can see how well the model did by comparing its predictions to the actual value (in the 'label' column).

In [32]:
RegressionEvaluator(metricName='rmse').evaluate(cv_lr_Model.transform(power_spk))

2137.40482555687

We may want to keep a record of the residual between the predictions and actual values. We can do that by creating a new column, 'residual', using the ```.withColumn()``` method.

In [33]:
cv_lr_Model.transform(power_spk).withColumn('residual', col('label')-col('prediction')).show(5)

+------------+------------+-----------+---------+--------------+--------------------+--------------------+------------------+------------------+
|Power_Zone_1|Power_Zone_2|      label|Night_Day|     Month_OHE|                 pca|            features|        prediction|          residual|
+------------+------------+-----------+---------+--------------+--------------------+--------------------+------------------+------------------+
|  34055.6962| 16128.87538|20240.96386|      0.0|(12,[1],[1.0])|[1.79440486365695...|(18,[0,1,4,15,16,...| 20801.86609984268|-560.9022398426787|
| 29814.68354| 19375.07599|20131.08434|      0.0|(12,[1],[1.0])|[1.80604083009823...|(18,[0,1,4,15,16,...|18584.434109315043| 1546.650230684958|
| 29128.10127| 19006.68693|19668.43373|      0.0|(12,[1],[1.0])|[1.81022976305639...|(18,[0,1,4,15,16,...|18134.277000510257|1534.1567294897432|
| 28228.86076| 18361.09422|18899.27711|      0.0|(12,[1],[1.0])|[1.79866765174088...|(18,[0,1,4,15,16,...|17519.863312803336| 1379

# Streaming

Now that we have a pipeline set up that can transform our data into a shape that we can fit to a model and run predictions, let's apply this model to streaming data. Here, we'll have data in the form of our original data set come in over a time interval (stream). As it comes in, we'll want our model to make predictions, keep a log of the 'label', 'prediction', and the calculated 'residual' and append that log to the transformed data set.

To get started, let's import some modules.

In [29]:
from pyspark.sql.types import StructType
from pyspark.sql.types import DoubleType, LongType

Before we set up a ```.readStream()``` instance, we'll need to define the schema of the stream so the ```.readStream()``` instance can define the data types as it comes in. We can review the schema of the data set using ```.printSchema()```.

In [30]:
power_spk.printSchema()

root
 |-- Temperature: double (nullable = false)
 |-- Humidity: double (nullable = false)
 |-- Wind_Speed: double (nullable = false)
 |-- General_Diffuse_Flows: double (nullable = false)
 |-- Diffuse_Flows: double (nullable = false)
 |-- Power_Zone_1: double (nullable = false)
 |-- Power_Zone_2: double (nullable = false)
 |-- Power_Zone_3: double (nullable = false)
 |-- Month: long (nullable = false)
 |-- Hour: double (nullable = false)



What's nice here is that we will be able to define the data types as they come in, so we won't need to do any data type transformations like we did earlier with the 'Hour' column.

Once the schema is defined, we can create the stream data frame ('power_df') using ```.readStream()```. Note how we attached the schema with the ```.schema()``` method and indicated that the stream would be in a **csv** format (```.csv()```) in the 'csv_power' folder.

In [31]:
power_schema = StructType().add('Temperature', 'double').add('Humidity', 'double').\
add('Wind_Speed', 'double').add('General_Diffuse_Flows', 'double').add('Diffuse_Flows', 'double').\
add('Power_Zone_1', 'double').add('Power_Zone_2', 'double').add('Power_Zone_3', 'double').\
add('Month', 'long').add('Hour', 'double')
power_df = spark.readStream.option("header", True).schema(power_schema).csv("csv_power")

Since we currently don't have access to a real data stream, we'll have to mimic one. The below code will randomly select three rows from a different power data set. This data was spliced from the original data set that we used in the first two parts, so it is in the same format as the original data set. It will then write the three random rows to a *csv* file located in the 'csv_power' folder. It will repeat this process ten times with a ten second interval betweeen csv files. When this code is executed, it will mimic a live data stream coming in to the 'csv_power' folder.

We'll display the code here, but it will actually be executed in a different console.

In [32]:
#demo = pd.read_csv("power_streaming_data.csv")
#demo_columns = list(demo)

#for i in range(0,10):
#    #randomly sample a few rows
#    temp = demo.loc[np.random.randint(demo.shape[0], size = 3)]
#    temp.columns=demo_columns
#    temp.to_csv("csv_power/demo" + str(i) + ".csv", index = False, header = True)
#    time.sleep(10)

Now we need to write out the transform to make the prediction and return the 'label', 'prediction', and 'residual' columns. Notice that we are simply re-using code from the transformations in the previous section, but substituting our stream data frame ('power_df') for the static data frame ('power_spk'). 'power_pred' runs the predictions and creates the residual column. 'pwr_pred' then selects the appropriate columns.

In [33]:
power_pred = cv_lr_Model.transform(power_df).withColumn('residual', col('label')-col('prediction'))
pwr_pred = power_pred.select(col('label'), col('prediction'), col('residual'))

#predQuery = pwr_pred.writeStream.outputMode("append").format("console").start()

'predQuery' was used for testing. We'll execute our entire query when we combine our two transformations.

In [1]:
#predQuery.stop()

We also want to keep a record of the original stream data as well as all the transformations that have been made. We can do this by simply running the stream through the pipeline but with one important update - we'll need to edit the SQL transformation ('sqlTrans_') to include ALL columns in the SELECT statement. We'll do that below and call it 'sqlTrans_2'.

In [36]:
sqlTrans_2 = SQLTransformer(statement = '''SELECT Temperature, Humidity, Wind_Speed, General_Diffuse_Flows, \
Diffuse_Flows, Power_Zone_1, Power_Zone_2, Power_Zone_3 as label, Hour, Night_Day, \
Month, Month_OHE, pca FROM __THIS__''')

Now we can update the original pipleline with 'sqlTrans_2' in place of 'sqlTrans_' to create a transformation that will return ALL columns and transformations.

In [37]:
power_trans = assembler_2.transform(sqlTrans_2.transform(pca_Trans.transform(assembler_.transform(encoder_Trans.\
                                transform(binary_HTrans.transform(power_df))))))

#tran_query = power_trans.writeStream.outputMode('append').format('console').start()

'tran_query' was for testing.

In [2]:
#tran_query.stop()

Now we can join the two stream data frames on the key 'label' so that when we start our stream the output will be a single data frame with all the columns and column transformations as well as the prediction and calculated residual.

We'll use ```.writeStream()``` to execute the stream indicating that we want the output to be appended to the console.

Once we are done, we can close the session with the ```.stop()``` method.

In [47]:
join_power = power_trans.join(pwr_pred, 'label', 'inner')\
.writeStream.outputMode('append').format('console').start()

24/04/27 11:13:25 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-1a222d6e-2730-4597-a8e0-a204cdd7a6b5. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/04/27 11:13:25 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+-----------+-----------+--------+----------+---------------------+-------------+------------+------------+----+---------+-----+---------------+--------------------+--------------------+------------------+-------------------+
|      label|Temperature|Humidity|Wind_Speed|General_Diffuse_Flows|Diffuse_Flows|Power_Zone_1|Power_Zone_2|Hour|Night_Day|Month|      Month_OHE|                 pca|            features|        prediction|           residual|
+-----------+-----------+--------+----------+---------------------+-------------+------------+------------+----+---------+-----+---------------+--------------------+--------------------+------------------+-------------------+
|    14438.4|      22.03|    74.1|      0.07|                674.9|        566.7| 26625.69536| 12955.50936|10.0|      1.0|    6| (12,[6],[1.0])|[-809.77544782833...|(18,[0,1,2,9,15,1...|11873.968255425854| 2564.4317445741453|

                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+-----------+-----------+--------+----------+---------------------+-------------+------------+------------+----+---------+-----+---------------+--------------------+--------------------+------------------+-------------------+
|      label|Temperature|Humidity|Wind_Speed|General_Diffuse_Flows|Diffuse_Flows|Power_Zone_1|Power_Zone_2|Hour|Night_Day|Month|      Month_OHE|                 pca|            features|        prediction|           residual|
+-----------+-----------+--------+----------+---------------------+-------------+------------+------------+----+---------+-----+---------------+--------------------+--------------------+------------------+-------------------+
|19106.62614|      21.17|    65.7|     4.924|                36.66|        26.44| 45903.54486| 29124.89627|18.0|      1.0|   10|(12,[10],[1.0])|[-41.281178786024...|(18,[0,1,2,13,15,...| 21764.21054636958|-2657.5844063695804|

                                                                                

-------------------------------------------
Batch: 2
-------------------------------------------
+-----------+-----------+--------+----------+---------------------+-------------+------------+------------+----+---------+-----+--------------+--------------------+--------------------+------------------+-------------------+
|      label|Temperature|Humidity|Wind_Speed|General_Diffuse_Flows|Diffuse_Flows|Power_Zone_1|Power_Zone_2|Hour|Night_Day|Month|     Month_OHE|                 pca|            features|        prediction|           residual|
+-----------+-----------+--------+----------+---------------------+-------------+------------+------------+----+---------+-----+--------------+--------------------+--------------------+------------------+-------------------+
|16372.36364|      15.36|   67.42|     0.075|                0.026|        0.193| 23511.21636| 12940.93686| 2.0|      0.0|    4|(12,[4],[1.0])|[1.54529564709417...|(18,[0,1,7,15,16,...|15513.622925345942|  858.7407146540572|
|12

                                                                                

-------------------------------------------
Batch: 3
-------------------------------------------
+-----------+-----------+--------+----------+---------------------+-------------+------------+------------+----+---------+-----+--------------+--------------------+--------------------+------------------+-------------------+
|      label|Temperature|Humidity|Wind_Speed|General_Diffuse_Flows|Diffuse_Flows|Power_Zone_1|Power_Zone_2|Hour|Night_Day|Month|     Month_OHE|                 pca|            features|        prediction|           residual|
+-----------+-----------+--------+----------+---------------------+-------------+------------+------------+----+---------+-----+--------------+--------------------+--------------------+------------------+-------------------+
|38500.41841|       25.5|    79.7|     4.914|                0.084|        0.152| 41302.32558| 29001.26582|22.0|      1.0|    7|(12,[7],[1.0])|[1.72363695419787...|(18,[0,1,2,10,15,...|32106.238660672734|  6394.179749327264|
|12

                                                                                

-------------------------------------------
Batch: 4
-------------------------------------------
+-----------+-----------+--------+----------+---------------------+-------------+------------+------------+----+---------+-----+---------------+--------------------+--------------------+------------------+------------------+
|      label|Temperature|Humidity|Wind_Speed|General_Diffuse_Flows|Diffuse_Flows|Power_Zone_1|Power_Zone_2|Hour|Night_Day|Month|      Month_OHE|                 pca|            features|        prediction|          residual|
+-----------+-----------+--------+----------+---------------------+-------------+------------+------------+----+---------+-----+---------------+--------------------+--------------------+------------------+------------------+
|14781.68675|      10.19|    75.8|     0.087|                0.084|        0.137| 23471.39241| 14392.70517| 2.0|      0.0|    1| (12,[1],[1.0])|[1.77572739499498...|(18,[0,1,4,15,16,...| 14251.34925744154| 530.3374925584612|
|26

                                                                                

-------------------------------------------
Batch: 5
-------------------------------------------
+-----------+-----------+--------+----------+---------------------+-------------+------------+------------+----+---------+-----+--------------+--------------------+--------------------+------------------+------------------+
|      label|Temperature|Humidity|Wind_Speed|General_Diffuse_Flows|Diffuse_Flows|Power_Zone_1|Power_Zone_2|Hour|Night_Day|Month|     Month_OHE|                 pca|            features|        prediction|          residual|
+-----------+-----------+--------+----------+---------------------+-------------+------------+------------+----+---------+-----+--------------+--------------------+--------------------+------------------+------------------+
|9865.066026|      13.39|    70.8|      0.08|                0.037|        0.134| 23726.23574| 19301.62627| 1.0|      0.0|   12|    (12,[],[])|[1.65970102172591...|(18,[0,1,15,16,17...| 8990.273810711387| 874.7922152886131|
|22799.

                                                                                

-------------------------------------------
Batch: 6
-------------------------------------------
+-----------+-----------+--------+----------+---------------------+-------------+------------+------------+----+---------+-----+---------------+--------------------+--------------------+------------------+-------------------+
|      label|Temperature|Humidity|Wind_Speed|General_Diffuse_Flows|Diffuse_Flows|Power_Zone_1|Power_Zone_2|Hour|Night_Day|Month|      Month_OHE|                 pca|            features|        prediction|           residual|
+-----------+-----------+--------+----------+---------------------+-------------+------------+------------+----+---------+-----+---------------+--------------------+--------------------+------------------+-------------------+
| 12017.3494|       15.6|    85.3|      0.07|                0.037|        0.163| 24923.07692|  19033.8843| 1.0|      0.0|   11|(12,[11],[1.0])|[2.01146835606787...|(18,[0,1,14,15,16...|11471.745773230632|  545.6036267693671|

                                                                                

-------------------------------------------
Batch: 7
-------------------------------------------
+-----------+-----------+--------+----------+---------------------+-------------+------------+------------+----+---------+-----+---------------+--------------------+--------------------+------------------+-------------------+
|      label|Temperature|Humidity|Wind_Speed|General_Diffuse_Flows|Diffuse_Flows|Power_Zone_1|Power_Zone_2|Hour|Night_Day|Month|      Month_OHE|                 pca|            features|        prediction|           residual|
+-----------+-----------+--------+----------+---------------------+-------------+------------+------------+----+---------+-----+---------------+--------------------+--------------------+------------------+-------------------+
|20145.23077|      28.59|   29.65|     4.919|                748.0|        43.24| 37929.53642| 23272.76507|15.0|      1.0|    6| (12,[6],[1.0])|[-726.78910829306...|(18,[0,1,2,9,15,1...| 21761.95956295512|-1616.7287929551203|

In [48]:
join_power.stop()

That's it! You can view the execution of the code in the attached screen video.