# Lab 2 - Visualization and Modeling

In this lab we will do some basic data visualization to see what kind of data we are working with
Then we will start creating some models

Please use <b>https</b> whenever providing your livy endpoint

In [None]:
# Use https please 
%setLivy --url https://mip-bd-vm129.mip.storage.hpecorp.net:10018

**If you set the Livy magic from current notebook then please restart the kernel and ingore setting up Livy next time.**

# Overriding default spark configuration using configure magic.

In [None]:
%%configure -f 
{ 
    "conf":{ 
        "spark.ssl.enabled" : false, 
        "spark.hadoop.fs.dtap.impl" : "com.bluedata.hadoop.bdfs.Bdfs", 
        "spark.hadoop.fs.AbstractFileSystem.dtap.impl" : "com.bluedata.hadoop.bdfs.BdAbstractFS", 
        "spark.hadoop.fs.dtap.impl.disable.cache" : false, 
        "spark.kubernetes.driver.label.hpecp.hpe.com/dtap" : "hadoop2", 
        "spark.kubernetes.executor.label.hpecp.hpe.com/dtap" : "hadoop2", 
        "spark.jars" : "local:///opt/bdfs/bluedata-dtap.jar", 
        "spark.kubernetes.container.image" : "devsds/spark-2.4.7:202104010902C" 
    }
}

## Importing Libraries and Spark Configuration

Don't forget to put in your intitials in the <b>STUDENT</b> variable

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
sns.set_style('white')
from warnings import filterwarnings
filterwarnings('ignore')

# Importing pyspark libraries
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark import SparkContext

#Fill in initials here
STUDENT = "" 

## Reading the Dataset

In [None]:
spark_df = spark.read.csv('dtap://TenantStorage/Scada-Dataset/T1.csv', header=True, inferSchema=True)

In [None]:
# Caching the dataset
spark_df.cache()

# Converting all the column names to lower case
spark_df = spark_df.toDF(*[c.lower() for c in spark_df.columns])

print('Show the first 5 rows')
print(spark_df.show(5))
print()
print('What are the variable data types?')
print(spark_df.printSchema())
print()
print('How many observations do we have?')
print(spark_df.count())

In [None]:
# Extracting a substring from columns to create month and hour variables

from pyspark.sql.functions import substring
spark_df = spark_df.withColumn("month", substring("date/time", 4,2))
spark_df = spark_df.withColumn("hour", substring("date/time", 12,2))

# Converting string month and hour variables to integer
from pyspark.sql.types import IntegerType
spark_df = spark_df.withColumn('month', spark_df.month.cast(IntegerType()))
spark_df = spark_df.withColumn('hour', spark_df.hour.cast(IntegerType()))

print(spark_df.show(5))

## Exploratory Data Analysis

In [None]:
pd.options.display.float_format = '{:.2f}'.format
spark_df.select('wind speed (m/s)', 'theoretical_power_curve (kwh)', 'lv activepower (kw)').toPandas().describe()

### Question: What are the distributions of the variables?

**For creating visualization we need to either use aggregated data or use a sample from the big data.**

**So we will get a random sample from our input.**

In [None]:
# Taking a random sample from the big data
sample_df = spark_df.sample(withReplacement=False, fraction=0.1, seed=42).toPandas()

# Visualizing the distributions with the sample data
columns = ['wind speed (m/s)', 'wind direction (deg)', 'month', 'hour', 'theoretical_power_curve (kwh)', 'lv activepower (kw)']
i=1
plt.figure(figsize=(10,12))
for each in columns:
    plt.subplot(3,2,i)
    sample_df[each].plot.hist(bins=12)
    plt.title(each)
    i += 1
%matplot plt

### Question: Is there any difference between the months for average power production ?

In [None]:
plt.clf()

# Average power production by month
monthly = spark_df.groupby('month').mean('lv activepower (kw)').sort('avg(lv activepower (kw))').toPandas()
sns.barplot(x='month', y='avg(lv activepower (kw))', data=monthly)
plt.title('Months and Average Power Production');

%matplot plt

### Question: Is there any difference between the hours for average power production?

In [None]:
plt.clf()

# Average power production by hour
hourly = spark_df.groupby('hour').mean('lv activepower (kw)').sort('avg(lv activepower (kw))').toPandas()
sns.barplot(x='hour', y='avg(lv activepower (kw))', data=hourly)
plt.title('Hours and Average Power Production');

%matplot plt

### Question: Is there any correlation between the wind speed, wind direction and power production?

In [None]:
pd.set_option('display.max_columns', None)
sample_df[columns].corr()

In [None]:
plt.clf()
sns.pairplot(sample_df[columns], markers='*');
%matplot plt

**Wind speed and power production is highly correlated as one would expect.**

**We can see there are lower level power production for some wind directions.**

### Question: What is the average power production level for different wind speeds?

In [None]:
# Finding average power production for 5 m/s wind speed increments
wind_speed = []
avg_power = []
for i in [0,5,10,15,20]:
    avg_value = spark_df.filter((spark_df['wind speed (m/s)'] > i) 
                                & (spark_df['wind speed (m/s)'] <= i+5))\
                                .agg({'lv activepower (kw)':'mean'}).collect()[0][0] 
    avg_power.append(avg_value)
    wind_speed.append(str(i) + '-' + str(i+5))

plt.clf()
sns.barplot(x=wind_speed, y=avg_power, color='orange')
plt.title('Avg Power Production for 5 m/s Wind Speed Increments')
plt.xlabel('Wind Speed')
plt.ylabel('Average Power Production');

%matplot plt

**From the graph above we can see the power production reaches near a maximum level after the wind speed reaches 15 m/s.**

### Question: What is the power production for different wind directions and speeds? 

**Let's create a polar diagram with wind speed, wind direction and power production from the sample data.**

In [None]:
# Creating the polar diagram
from math import radians

plt.clf()
plt.figure(figsize=(8,8))
ax = plt.subplot(111, polar=True)
# Inside circles are the wind speed and marker color and size represents the amount of power production
sns.scatterplot(x=[radians(x) for x in sample_df['wind direction (deg)']], 
                y=sample_df['wind speed (m/s)'],
                size=sample_df['lv activepower (kw)'],
                hue=sample_df['lv activepower (kw)'],
                alpha=0.7, legend=None)
# Setting the polar diagram's top represents the North 
ax.set_theta_zero_location('N')
# Setting -1 to start the wind direction clockwise
ax.set_theta_direction(-1)
# Setting wind speed labels in a better position to see
ax.set_rlabel_position(110)
plt.title('Wind Speed - Wind Direction - Power Production Diagram')
plt.ylabel(None);

%matplot plt

**We can see that the wind turbine produces more power if the wind blows from the directions between 0-90 and 180-225 degrees.**

### Question: Does the manufacturer's theoritical power production curve fit well with the real production?

In [None]:
plt.clf()
plt.figure(figsize=(10,6))
sns.scatterplot(x='wind speed (m/s)', y='lv activepower (kw)', color='orange', label='Real Production', alpha=0.5, data=sample_df)
sns.lineplot(x='wind speed (m/s)', y='theoretical_power_curve (kwh)', color='blue', label='Theoritical Production', data=sample_df)
plt.title('Wind Speed and Power Production Chart')
plt.ylabel('Power Production (kw)');

%matplot plt

**From the graph above, we can see the theoritical power production curve generally fits well with the real production.**

**We can see the power production reaches a maximum level and continues in a straight line if the wind speed reaches to 15 m/s.**

**Also we can see there are some 0 power production, even the wind speed is higher than 5 m/s. Lets investigate the reason.**

**But before that what is the minimum wind speed for theoritical power production curve?**

### Question: What is the wind speed threshold value for zero theorical power?

In [None]:
# Filter the big data where the real and theoritical power productions are equal to 0
zero_theo_power = spark_df.filter((spark_df['lv activepower (kw)'] == 0)
                                  & (spark_df['theoretical_power_curve (kwh)'] == 0)).toPandas()

print(zero_theo_power[['wind speed (m/s)', 'theoretical_power_curve (kwh)', 'lv activepower (kw)']].sample(5))

In [None]:
plt.clf()
# Let's see the wind speed distribution for 0 power production
zero_theo_power['wind speed (m/s)'].hist()
plt.title('Wind Speed Distribution for 0 Power Production')
plt.xlabel('Wind speed (m/s)')
plt.ylabel('Counts for 0 Power Production');

%matplot plt

**We can see from above, limit for the theoritical power curve is 3 m/s wind speed. If the wind speed is below 3 m/s model doesn't expect any power production.**

**But there are some observations for 0 power production even the wind speed is more than 3 m/s.**

### Question: Why there aren't any power production in some observations while the wind speed is higher than 3 m/s?

In [None]:
# Observations for the wind speed > 3m/s and power production = 0, 
# While theoritically there should be power production
zero_power = spark_df.filter((spark_df['lv activepower (kw)'] == 0)
                            & (spark_df['theoretical_power_curve (kwh)'] != 0)
                            & (spark_df['wind speed (m/s)'] > 3)).toPandas()
print(zero_power.head())
print('No of Observations (while Wind Speed > 3 m/s and Power Production = 0): ', len(zero_power))

**There are 3497 observations where theoritically there should be power production. From the dataset we cannot see the reason, it might be caused by maintenance. But let's see if we can see any information from the wind speed, direction and month?**

In [None]:
plt.clf()
zero_power['wind speed (m/s)'].plot.hist(bins=8)
plt.xlabel('Wind Speed (m/s)')
plt.ylabel('Counts for Zero Production')
plt.title('Wind Speed Counts for Zero Power Production')
plt.xticks(ticks=np.arange(4,18,2));

%matplot plt

**It looks like theoritically wind speed threshold should be 4 m/s. But there are also other observations with zero power production while the wind speed is higher.**

**Let's see the monthly distribution for zero power production.**

In [None]:
plt.clf()
sns.countplot(zero_power['month']);
%matplot plt

**It is usually in December and January when the wind turbine doesn't produce production.**

**Because on the available information we cannot decide if these zero power productions are caused by maintenance periods or something else, Lets assume those 3497 observations as outliers and remove them from the dataset.**

In [None]:
# Excluding the observations meeting the filter criterias 
spark_df = spark_df.filter(~((spark_df['lv activepower (kw)'] == 0)
                            & (spark_df['theoretical_power_curve (kwh)'] != 0)
                            & (spark_df['wind speed (m/s)'] > 3)))

In [None]:
spark_df.show(20)

### Question: Is there any other outliers?

In [None]:
columns = ['wind speed (m/s)', 'wind direction (deg)', 'theoretical_power_curve (kwh)', 'lv activepower (kw)']
i=1
plt.clf()
plt.figure(figsize=(20,3))
for each in columns:
    df = spark_df.select(each).toPandas()
    plt.subplot(1,4,i)
    #plt.boxplot(df)
    sns.boxplot(x=df[each])
    plt.title(each)
    i += 1
    
%matplot plt

**From the graphs above we can see there are some outliers in the wind speed data.**

**We will find the upper and lower threshold values for the wind speed data, and analyze the outliers.**

In [None]:
# Create a pandas df for visualization
wind_speed = spark_df.select('wind speed (m/s)').toPandas()

# Defining the quantiles and interquantile range
Q1 = wind_speed['wind speed (m/s)'].quantile(0.25)
Q3 = wind_speed['wind speed (m/s)'].quantile(0.75)
IQR = Q3-Q1
# Defining the lower and upper threshold values
lower = Q1 - 1.5*IQR
upper = Q3 + 1.5*IQR

print('Quantile (0.25): ', Q1, '  Quantile (0.75): ', Q3)
print('Lower threshold: ', lower, ' Upper threshold: ', upper)

In [None]:
# Fancy indexing for outliers
outlier_tf = (wind_speed['wind speed (m/s)'] < lower) | (wind_speed['wind speed (m/s)'] > upper)

print('Total Number of Outliers: ', len(wind_speed['wind speed (m/s)'][outlier_tf]))
print('--'*15)
print('Some Examples of Outliers:')
print(wind_speed['wind speed (m/s)'][outlier_tf].sample(10))

**It is a rare event for wind speed to be over 19 m/s in our dataset.**

**Out of 47033, there is only 407 observations while the wind speed is over 19 m/s.**

**Now Lets see average power production for these high wind speed.**

In [None]:
spark_df.select('wind speed (m/s)', 'lv activepower (kw)')\
.filter(spark_df['wind speed (m/s)'] >= 19)\
.agg({'lv activepower (kw)':'mean'}).show()

**So instead of erasing the outliers, we are going to set the wind speed as 19 m/s for those observations.**

In [None]:
from pyspark.sql import functions as F
spark_df = spark_df.withColumn('wind speed (m/s)', 
                               F.when(F.col('wind speed (m/s)') > 19.447, 19)
                               .otherwise(F.col('wind speed (m/s)')))
spark_df.count()

### Question: What are the general criterias for power production?

**It is important to understand the pattern in the data. We should learn the data before the machine.**

**1. We saw from the graph that in March, August and November, the average power production is higher.**

**2. The average power production is higher daily between 16:00 and 24:00.**

**3. The power production is higher when the wind blows from the directions between 0-90 and 180-225 degrees.**

**So let's try to predict a high and low level of power production from the criterias above before ML algorithm.**

In [None]:
# High level power production
spark_df.filter(((spark_df['month'] == 3) | (spark_df['month'] == 8) | (spark_df['month'] == 11)) 
                & ((spark_df['hour'] >= 16) | (spark_df['hour'] <= 24)) 
                & ((spark_df['wind direction (deg)'] > 0) | (spark_df['wind direction (deg)'] < 90))
                & ((spark_df['wind direction (deg)'] > 180) | (spark_df['wind direction (deg)'] < 225))
               ).agg({'lv activepower (kw)':'mean'}).show()

In [None]:
# Low level power production
spark_df.filter((spark_df['month'] == 7) 
                & ((spark_df['hour'] >= 9) | (spark_df['hour'] <= 11)) 
                & ((spark_df['wind direction (deg)'] > 90) | (spark_df['wind direction (deg)'] < 160))
               ).agg({'lv activepower (kw)':'mean'}).show()

## Data Preparation for ML Algorithms

**After analysing and understanding the dataset, we can build a ML regression model to predict wind turbine power production by using the wind speed, wind direction, month of the year and hour of the day.**

**Using ML algorithms with Spark is a bit different from well known Sckitlearn library.**

**We need to feed the model with a dataframe made of variables compressed in vectors called as 'features', and target variable as 'label'. For these convertions we are going to use VectorAssembler method from Pyspark.**

In [None]:
# Preparing the independent variables (Features)
from pyspark.ml.feature import VectorAssembler

# Converting lv activepower (kw) variable as label
spark_df = spark_df.withColumn('label', spark_df['lv activepower (kw)'])

# Defining the variables to be used
variables = ['month', 'hour', 'wind speed (m/s)', 'wind direction (deg)']
vectorAssembler = VectorAssembler(inputCols = variables, outputCol = 'features')
va_df = vectorAssembler.transform(spark_df)

# Combining features and label column
final_df = va_df.select('features', 'label')
final_df.show(10)

## Train Test Split

**Now we can split our dataset into train and test datasets.**

In [None]:
splits = final_df.randomSplit([0.8, 0.2])
train_df = splits[0]
test_df = splits[1]

train_df.write.save("dtap://TenantStorage/Scada-Dataset/" + STUDENT + "_train_df.parquet")
test_df.write.save("dtap://TenantStorage/Scada-Dataset/" + STUDENT + "_test_df.parquet")

print('Train dataset: ', train_df.count())
print('Test dataset : ', test_df.count())

In [None]:
spark_df = spark.read.csv('dtap://TenantStorage/Scada-Dataset/T1.csv', header=True, inferSchema=True)

# Caching the dataset
spark_df.cache()

# Converting all the column names to lower case
spark_df = spark_df.toDF(*[c.lower() for c in spark_df.columns])

# Extracting a substring from columns to create month and hour variables

from pyspark.sql.functions import substring
spark_df = spark_df.withColumn("month", substring("date/time", 4,2))
spark_df = spark_df.withColumn("hour", substring("date/time", 12,2))

# Converting string month and hour variables to integer
from pyspark.sql.types import IntegerType
spark_df = spark_df.withColumn('month', spark_df.month.cast(IntegerType()))
spark_df = spark_df.withColumn('hour', spark_df.hour.cast(IntegerType()))

# Taking a random sample from the big data
sample_df = spark_df.sample(withReplacement=False, fraction=0.1, seed=42).toPandas()

In [None]:
train_df = spark.read.load('dtap://TenantStorage/Scada-Dataset/' + STUDENT + '_train_df.parquet')
test_df = spark.read.load('dtap://TenantStorage/Scada-Dataset/' + STUDENT + '_test_df.parquet')

train_df.show()

## Creating the Initial Model

**Lets use GBT regressor for this study.**

In [None]:
from pyspark.ml.regression import GBTRegressor

# Creating the gbm regressor object
gbm = GBTRegressor(featuresCol='features', labelCol='label')

# Training the model with train data
gbm_model = gbm.fit(train_df)

# Predicting using the test data
y_pred = gbm_model.transform(test_df)

# Initial look at the target and predicted values
y_pred.select('label', 'prediction').show(20)

**Store our model in dtap**

In [None]:
gbm_model.save('dtap://TenantStorage/Scada-Model/' + STUDENT + '_GBM.model')

**Load Model - Test**

In [None]:
from pyspark.ml.regression import GBTRegressionModel
gbm_model_dtap = GBTRegressionModel.load("dtap://TenantStorage/Scada-Model/" + STUDENT + "_GBM.model")

**Let's evaluate our model's success.**

In [None]:
# Initial model success
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='label')

print('R2 SCORE : ', evaluator.evaluate(y_pred, {evaluator.metricName: 'r2'}))
print('MAE      : ', evaluator.evaluate(y_pred, {evaluator.metricName: 'mae'}))
print('RMSE     : ', evaluator.evaluate(y_pred, {evaluator.metricName: 'rmse'}))

**R2 score means, real power production's 98% variability can be explained by the ML model.**

**MAE is the mean absolute difference between the real and predicted power production.**

**RMSE is the square root of mean squared difference between the real and predicted values.**

**Even though the R2 is high, we should also check the MAE and RMSE values with the real value's summary statistics.**

**One can tune the hyperparameters to increase the model success.**

## Comparing Real, Theoritical and Predicted Power Productions

**Lets use sample_df for comparing the actual, theoritical and the model power productions.**

In [None]:
from pyspark.ml.feature import VectorAssembler

# Converting sample_df back to Spark dataframe
eva_df = spark.createDataFrame(sample_df)

# Converting lv activepower (kw) variable as label
eva_df = eva_df.withColumn('label', eva_df['lv activepower (kw)'])

# Defining the variables to be used
variables = ['month', 'hour', 'wind speed (m/s)', 'wind direction (deg)']
vectorAssembler = VectorAssembler(inputCols = variables, outputCol = 'features')
vec_df = vectorAssembler.transform(eva_df)

# Combining features and label column
vec_df = vec_df.select('features', 'label')

# Using ML model to predict
preds = gbm_model.transform(vec_df)
preds_df = preds.select('label','prediction').toPandas()

# Compining dataframes to compare
frames = [sample_df[['wind speed (m/s)', 'theoretical_power_curve (kwh)']], preds_df]
sample_data = pd.concat(frames, axis=1)

plt.clf()
# Visualizing real, theoritical and predicted power production
plt.figure(figsize=(10,7))
sns.scatterplot(x='wind speed (m/s)', y='label',alpha=0.5, label= 'Real Power', data=sample_data)
sns.scatterplot(x='wind speed (m/s)', y='prediction', alpha=0.7, label='Predicted Power', marker='o', data=sample_data)
sns.lineplot(x='wind speed (m/s)', y='theoretical_power_curve (kwh)', label='Theoritical Power',color='purple', data=sample_data)
plt.title('Wind Turbine Power Production Prediction')
plt.ylabel('Power Production (kw)')
plt.legend();

%matplot plt

**From the graph above, the model fits better to the real power productions, than the theoritical power production curve.**