# Project 4

## Richard Xiao

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[*]').appName('my_app').getOrCreate()
import pandas as pd
import numpy as np
import pyspark.pandas as ps
from pyspark.sql.functions import avg, stddev,max, percentile_approx

In [None]:
from pyspark.ml.stat import Correlation

In [None]:
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType

In [None]:
import pyspark.pandas as ps

Code below reads in the power csv file

In [None]:
power = spark.read.load("power_ml_data.csv",
                            format = "csv",
                            sep = ",",
                            inferSchema = "true",
                            header = "true")  

power

# Aggregations 

The code below gives the average of the temperature, humidity, wind speed and the power zones. Looking at it, power zone 1 has the highest average out of the three power zones.

In [None]:
power_mean = power\
               .agg(avg('Temperature'),avg('Humidity'),avg('Wind_Speed'),avg('Power_Zone_1'),avg('Power_Zone_2'),avg('Power_Zone_3'))\
               .show()


Code below gives the standard deviaton.Power zone 1 has the highest standard deviation.

In [None]:
stddev_power = power\
                 .agg(stddev('Temperature'),stddev('Humidity'),stddev('Wind_Speed'),stddev('Power_Zone_1'),stddev('Power_Zone_2'),stddev('Power_Zone_3'))\
                 .show()

## Max,Median,and Min

For this section, I wanted to get the maximum and minimum values for temperature, humidity and wind speed for each month. It seems that July had the highest max temperature and February had the lowest max value,December for highest humidity and January with the lowest max value, and October had the highest max wind seed and Feb,March and April had the lowest max wind speed values. 

For the minimum observations, December had the lowest min temp while June had the highest min temp. For minimum humidity, July had the highest min humidity while Feb had the lowest min humidity.For wind speed, July had the lowest min value and May had the highest min value.

In [None]:
max_power_temp = power.groupby("Month").max("Temperature") \
              .show()

In [None]:
max_power_humidity = power.groupby("Month").max("Humidity") \
              .show()

In [None]:
max_power_wind = power.groupby("Month").max("Wind_Speed") \
              .show()

In [None]:
min_power_temp = power.groupby("Month").min("Temperature") \
              .show()

In [None]:
min_power_humidity = power.groupby("Month").min("Humidity") \
              .show()

In [None]:
min_power_wind = power.groupby("Month").min("Wind_Speed") \
              .show()

For the median observations, Power Zone 1 had the highest median value.

In [None]:
med_power = power\
                 .agg(percentile_approx("Temperature", 0.5).alias("median_temp"),percentile_approx("Humidity", 0.5).alias("median_humidity"),percentile_approx("Wind_Speed", 0.5).alias("median_Wind_Speed"),percentile_approx("Power_Zone_1", 0.5).alias("median_power_zone_1"),percentile_approx("Power_Zone_2", 0.5).alias("median_power_zone_2"),percentile_approx("Power_Zone_3", 0.5).alias("median_power_zone_3")) \
                 .show()

## Correlation and Contingency Tables

The code below will read it in as a spark data frame using pandas.

In [None]:
df_power=ps.DataFrame(power)

Next, I selected every column except for hour and month to get the correlation of the numeric variables. Looking at some of the notable results, temperature has the highest correlation with power zone 3, power zone 2 has the highest correlation with power zone 1 and power zone 2 has the highest correlation with power zone 1.Power zone 3 also seems to be highly correlated with power zone 1.

In [None]:
df_new = df_power.iloc[:,0:8]
df_new.corr('pearson')
#df_power.corr('pearson')

The next chunks of code deals with contingency tables. The first line of code is the two way contingency table for month and hour. The next two are one way contingency tables for month and hour. 

In [None]:
contingency_temp = power.crosstab("month","Hour")

contingency_temp.show()

This table shows that March had the highest count of observations.

In [None]:
contingency_temp = power.groupBy("month").count()

contingency_temp.show()

The hour table shows that hour 6 had the most amount of observations.

In [None]:
contingency_temp = power.groupBy("Hour").count()

contingency_temp.show()

The code chunks belw is finding the average and standard deviation of the numeric variables.

According to the averages table, July had the highest average temperature and February the lowest. April had the highest average humidity and July the lowest. For wind speed, April had the lowest avg wind speed and September the highest. August had the highest average value for power zone 1 and December the lowest average. For power zone 2, April had the lowest average and August the highest. And for power zone 3, July had the highest average value and December the lowest value.

In [None]:
powermonthmean = power.groupby('month').agg(avg('Temperature'),avg('Humidity'),avg('Wind_Speed'),avg('Power_Zone_1'),avg('Power_Zone_2'),avg('Power_Zone_3')).show()

Based on observations from this table, July had the highest standard deviation for temperature and February for the lowest. For humidity, January had the lowest whileJuly had the highest. May had the highest std for wind speed and April for the lowest. For power zone 1, November had the lowest and January the highest. Power zone 2 had December as the highest standard deviation and April the lowest. And for power zone 3, July had the highest and December the lowest.

In [None]:
powermonthstd = power.groupby('month').agg(stddev('Temperature'),stddev('Humidity'),stddev('Wind_Speed'),stddev('Power_Zone_1'),stddev('Power_Zone_2'),stddev('Power_Zone_3')).show()

This code below converts the hour variable as a double type. This data will be used for the rest of the project.

In [None]:
test_power = power.withColumn("Hour", power.Hour.cast(DoubleType()))
test_power.printSchema()

In [None]:
test_power

## Fitting Model

These next code chunks will construct the first pipeline where the model is fitted.

In [None]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

In [None]:
from pyspark.ml.feature import SQLTransformer, StringIndexer,Binarizer,VectorAssembler, PolynomialExpansion

Binary transformer to convert hour into a binary variable type

In [None]:
binaryhourTrans = Binarizer(threshold = 6.5, inputCols = ["Hour"], outputCols = ["Binary_Hour"])

This code chunk will make an encoder transformer for the month variable.

In [None]:
oneHotEncoder = OneHotEncoder(inputCol="Month", outputCol="month_hot")

In [None]:
from pyspark.ml.feature import PCA, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

Compile the assembler for the PCA.

In [None]:
assembler_pca = VectorAssembler(inputCols = ["Temperature", "Humidity","Wind_Speed","General_Diffuse_Flows","Diffuse_Flows"], outputCol = "features", handleInvalid = 'keep')

Construct PCA component

In [None]:
pca = PCA(k = 3, inputCol = "features", outputCol= "features_pca")

Construct the second assembler to serve as the predictors for use.

In [None]:
assembler_new = VectorAssembler(inputCols = ["features_pca", "Binary_Hour","Power_Zone_1","Power_Zone_2","month_hot"], outputCol = "features_new", handleInvalid = 'keep')

This sql transformer will select only the new features and the response variable, which is power zone 3.

In [None]:
sqlTrans = SQLTransformer(
    statement = """
                SELECT features_new, Power_Zone_3 as label FROM __THIS__
                """
)

Elastic net object is declared below

In [None]:
elastic_net = LinearRegression(featuresCol="features_new", labelCol="label", elasticNetParam=0.5)

Pipeline code

In [None]:
pipeline = Pipeline(stages = [binaryhourTrans,oneHotEncoder,assembler_pca,pca,assembler_new,sqlTrans,elastic_net])

In [None]:
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder

These next three code chunks sets up the grid for regParam and elasticNetParam, the cross validator and will fit the model against the new power data.

In [None]:
paramGrid = ParamGridBuilder() \
    .addGrid(elastic_net.regParam, [0, 0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.98, 0.99, 1]) \
    .addGrid(elastic_net.elasticNetParam, [0, 0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.98, 0.99, 1]) \
    .build()

In [None]:
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=RegressionEvaluator(), numFolds=5)

In [None]:
cvModel = cv.fit(test_power)

Sets up the prediction

In [None]:
predictions = cvModel.transform(test_power)
predictions.show()

Declares rsme value.

In [None]:
rmse = RegressionEvaluator(metricName="rmse").evaluate(predictions)

In [None]:
print("RMSE: {:.2f}".format(rmse))

Create a new column which is the residuals.

In [None]:
residuals = predictions.withColumn("residual",col("label") - col("prediction"))

In [None]:
residuals.show()

# Streaming Part

In [None]:
from pyspark.sql.types import StructType, StructField

In [None]:
newpower_stream = spark.read.load("power_streaming_data.csv",
                            format = "csv",
                            sep = ",",
                            inferSchema = "true",
                            header = "true")  


In [None]:
from pyspark.sql import SparkSession
##spark = SparkSession.builder.master("local(*)").appName("power_stream").getOrCreate()

In [None]:
spark = SparkSession.builder.getOrCreate()

Sets up the schema 

In [None]:
myschema = StructType([ \
    StructField("Temperature", DoubleType(), True), \
    StructField("Humidity", DoubleType(), True), \
    StructField("Wind_Speed" ,DoubleType(), True), \
    StructField("General_Diffuse_Flows", DoubleType(), True), \
    StructField("Diffuse_Flows", DoubleType(), True), \
    StructField("Power_Zone_1", DoubleType(), True), \
    StructField("Power_Zone_2", DoubleType(), True), \
    StructField("Power_Zone_3", DoubleType(), True), \
    StructField("Month", DoubleType(), True), \
    StructField("Hour", DoubleType(), True), \
  ])

In [None]:
myschema

Code below sets up folder to output csv files

In [None]:
streamread = spark.readStream.option("header", "True").schema(myschema).csv("csv_streaming")

In [None]:
streamread

## Transfrom/Aggregation

In [None]:
from pyspark.sql.functions import col
from pyspark.sql.functions import explode, split

Sets up the second pipeline function to join. For this part, I reused most of my first pipeline transformers. The only different thing is I added in a second sql transformer which selects all of the data instead of only the new features and label.

In [None]:
sqlTrans2 = SQLTransformer(
    statement = """
                SELECT *,Power_Zone_3 as label FROM __THIS__
                """
)

In [None]:
pipeline2 = Pipeline(stages = [oneHotEncoder,binaryhourTrans,assembler_pca,pca, assembler_new,sqlTrans2])

These next code chunks will transform the stream data using the first pipeline. This is the first stream of data to use.

In [None]:
cvModel_stream = cvModel.transform(streamread)

In [None]:
newstream = cvModel.transform(streamread).withColumn("residuals", col("label") - col("prediction")) #first thing you want to join

In [None]:
newstream

Second pipeline is used to use for the second stream

In [None]:
newstream2=pipeline2.fit(test_power).transform(streamread)

In [None]:
newstream2

In [None]:
from pyspark.sql.functions import window

Code chunk below will join the two data streams together on the label.

In [None]:
joinquery = newstream.join(newstream2,"label", "inner").writeStream.outputMode("append").format("console").start()

In [None]:
joinquery

In [None]:
joinquery.stop()

## Produce Data

The code chunk below will be submitted to the console and output various csv files.

In [None]:
power_stream = pd.read_csv("csv_streaming/power_streaming_data.csv")

In [None]:
import numpy as np
import time

In [None]:
 for i in range(0,50):
    temp = power_stream.loc[np.random.randint(power_stream.shape[0], size = 3)]
    temp.to_csv("csv_streaming/power_stream" + str(i) + ".csv", index = False, header = True)
    time.sleep(10)