## Feature Engineering

#### Baseline model features:
    - each row represents a day of data, where each category of feature has 24 columns corresponding to 24 hours in a day
    
#### RNN/LSTM model features:
    - Engineered 3d data corresponding to (batch_size, time_steps, input_dim)

In [5]:
try:
    sc.stop()
except:
    pass
from pyspark import SparkContext
from bigdl.util.common import *


# create sparkcontext with bigdl configuration
sc = SparkContext.getOrCreate(conf=create_spark_conf().setMaster("local[*]"))
from pyspark.sql import SparkSession
spark = SparkSession(sparkContext = sc)
spark.conf.set("spark.sql.session.timeZone", "UTC")

import seaborn as sns
import scipy
import matplotlib.pyplot as plt
import pandas as pd
%matplotlib inline

df = spark.read.csv(f"hdfs://localhost:9000/solar_data/interim/Arkansas_interim.csv", inferSchema=True, header=True)

Prepending /home/dyllanjr/anaconda3/envs/solarenv/lib/python3.6/site-packages/bigdl/share/conf/spark-bigdl.conf to sys.path


In [2]:
df.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Day: integer (nullable = true)
 |-- Hour: integer (nullable = true)
 |-- Minute: integer (nullable = true)
 |-- Cloud Type: integer (nullable = true)
 |-- Dew Point: integer (nullable = true)
 |-- Fill Flag: integer (nullable = true)
 |-- Wind Speed: double (nullable = true)
 |-- Surface Albedo: double (nullable = true)
 |-- Temperature: integer (nullable = true)
 |-- Solar Zenith Angle: double (nullable = true)
 |-- Wind Direction: double (nullable = true)
 |-- GHI: integer (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [3]:
df.select('minute').distinct().show()

+------+
|minute|
+------+
|    30|
|     0|
+------+



In [9]:
from pyspark.sql import functions as F

In [61]:
#The predicted column will be the sum of GHI for every half hour in the day
#the average daily Wh/M^2 is around 4000, so these summed GHI values make sense.
ghi_df = (df.groupby(F.year(F.col('timestamp')).alias('year'),
            F.month(F.col('timestamp')).alias('month'), 
            F.dayofmonth(F.col('timestamp')).alias('day'))
     .sum('GHI')
)

In [51]:
###We want each row to contain information about every hour of the day
#Note that the data is every half hour, but I'll be simplifying to every hour for implementation's sake.
#What I have to do is pivot where my index is the unique year/month/day combination
#the columns to pivot will be hour and minute
#the values will be 
cols = ['Cloud Type', 'Dew Point', 'Wind Speed', 'Surface Albedo', 'Temperature', 'Solar Zenith Angle', 'Wind Direction']

In [52]:
def rename_cols(feature, df):
    for hour in range(0, 24):
        df = df.withColumnRenamed(str(hour), feature.replace(' ', '_')+'_'+str(hour))
    return df

def generate_feature_pivot(feature, df):
    pivoted_df = (df.groupby(F.year(F.col('timestamp')).alias('year'),
                        F.month(F.col('timestamp')).alias('month'), 
                        F.dayofmonth(F.col('timestamp')).alias('day'))
                 .pivot('Hour')
                 .agg(F.first(feature)) #There will  be two values in this group, the first is right on the hour
            )
    pivoted_df = rename_cols(feature=feature, df=pivoted_df)
    return pivoted_df

In [53]:
#take a count of each pivot and final join to see if inner join lost any rows.
counts = []
for ix, col in enumerate(cols):
    if ix == 0:
        joined = generate_feature_pivot(feature=col, df=df)
        counts.append(joined.count())
        #to keep track of progress
        print(ix)
    else:
        joined2 = generate_feature_pivot(feature=col, df=df)
        counts.append(joined2.count())
        joined = joined.join(joined2, on=['year', 'month', 'day'])
        print(ix)

0
1
2
3
4
5
6


In [59]:
joined.show(1) #That's alot of columns

+----+-----+---+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+-------------+------------------+-------------+------------------+------------------+-------------+-------------+------------------+------------------+-------------+------------------+------------------+-------------+---------

In [62]:
#One last join to get our sum(GHI) target column
joined = joined.join(ghi_df, on=['year', 'month', 'day'])

In [63]:
joined.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- Cloud_Type_0: integer (nullable = true)
 |-- Cloud_Type_1: integer (nullable = true)
 |-- Cloud_Type_2: integer (nullable = true)
 |-- Cloud_Type_3: integer (nullable = true)
 |-- Cloud_Type_4: integer (nullable = true)
 |-- Cloud_Type_5: integer (nullable = true)
 |-- Cloud_Type_6: integer (nullable = true)
 |-- Cloud_Type_7: integer (nullable = true)
 |-- Cloud_Type_8: integer (nullable = true)
 |-- Cloud_Type_9: integer (nullable = true)
 |-- Cloud_Type_10: integer (nullable = true)
 |-- Cloud_Type_11: integer (nullable = true)
 |-- Cloud_Type_12: integer (nullable = true)
 |-- Cloud_Type_13: integer (nullable = true)
 |-- Cloud_Type_14: integer (nullable = true)
 |-- Cloud_Type_15: integer (nullable = true)
 |-- Cloud_Type_16: integer (nullable = true)
 |-- Cloud_Type_17: integer (nullable = true)
 |-- Cloud_Type_18: integer (nullable = true)
 |-- Cloud_Type_19: 

In [64]:
len(joined.columns)

172

In [2]:
24*7+4 #got all the columns

172

In [67]:
counts #didn't lose any from the first join, but I'm not sure why only one element.

[7300]

In [70]:
#copied from above for ease of copying in future
def rename_cols(feature, df):
    for hour in range(0, 24):
        df = df.withColumnRenamed(str(hour), feature.replace(' ', '_')+'_'+str(hour))
    return df

def generate_feature_pivot(feature, df):
    pivoted_df = (df.groupby(F.year(F.col('timestamp')).alias('year'),
                        F.month(F.col('timestamp')).alias('month'), 
                        F.dayofmonth(F.col('timestamp')).alias('day'))
                 .pivot('Hour')
                 .agg(F.first(feature)) #There will  be two values in this group, the first is right on the hour
            )
    pivoted_df = rename_cols(feature=feature, df=pivoted_df)
    return pivoted_df

#Finally I'll make one big convenience function to do the same with future datasets.
def process_data(df):
    ghi_df = (df.groupby(F.year(F.col('timestamp')).alias('year'),
                F.month(F.col('timestamp')).alias('month'), 
                F.dayofmonth(F.col('timestamp')).alias('day'))
         .sum('GHI')
    )
    
    cols = ['Cloud Type', 'Dew Point', 'Wind Speed', 'Surface Albedo', 'Temperature', 'Solar Zenith Angle', 'Wind Direction']
    #take a count of each pivot and final join to see if inner join lost any rows.
    counts = []
    for ix, col in enumerate(cols):
        if ix == 0:
            joined = generate_feature_pivot(feature=col, df=df)
            counts.append(joined.count())
            #to keep track of progress
            print(ix)
        else:
            joined2 = generate_feature_pivot(feature=col, df=df)
            counts.append(joined2.count())
            joined = joined.join(joined2, on=['year', 'month', 'day'])
            print(ix)
    joined = joined.join(ghi_df, on=['year', 'month', 'day'])
    return joined

In [72]:
350400names = ['Arkansas', 'Arizona', 'Georgia']
for name in names:
    df = spark.read.csv(f"hdfs://localhost:9000/solar_data/interim/{name}_interim.csv", inferSchema=True, header=True)
    joined = process_data(df)
    joined.write.csv(f"/home/dyllanjr/Solar_Irradiance_Prediction/data/processed/{name}_1.csv", header=True)

/home/dyllanjr/Solar_Irradiance_Prediction/data/processed
0
1
2
3
4
5
6
0
1
2
3
4
5
6
0
1
2
3
4
5
6


In [76]:
joined.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- Cloud_Type_0: integer (nullable = true)
 |-- Cloud_Type_1: integer (nullable = true)
 |-- Cloud_Type_2: integer (nullable = true)
 |-- Cloud_Type_3: integer (nullable = true)
 |-- Cloud_Type_4: integer (nullable = true)
 |-- Cloud_Type_5: integer (nullable = true)
 |-- Cloud_Type_6: integer (nullable = true)
 |-- Cloud_Type_7: integer (nullable = true)
 |-- Cloud_Type_8: integer (nullable = true)
 |-- Cloud_Type_9: integer (nullable = true)
 |-- Cloud_Type_10: integer (nullable = true)
 |-- Cloud_Type_11: integer (nullable = true)
 |-- Cloud_Type_12: integer (nullable = true)
 |-- Cloud_Type_13: integer (nullable = true)
 |-- Cloud_Type_14: integer (nullable = true)
 |-- Cloud_Type_15: integer (nullable = true)
 |-- Cloud_Type_16: integer (nullable = true)
 |-- Cloud_Type_17: integer (nullable = true)
 |-- Cloud_Type_18: integer (nullable = true)
 |-- Cloud_Type_19: 

In [3]:
### RNN/LSTM data preperation
#https://stackoverflow.com/questions/57549011/lstm-keras-input-shape-confusion
def create_windows(data, window_shape, step = 1, start_id = None, end_id = None):
    
    data = np.asarray(data)
    data = data.reshape(-1,1) if np.prod(data.shape) == max(data.shape) else data
        
    start_id = 0 if start_id is None else start_id
    end_id = data.shape[0] if end_id is None else end_id
    
    data = data[int(start_id):int(end_id),:]
    window_shape = (int(window_shape), data.shape[-1])
    step = (int(step),) * data.ndim
    slices = tuple(slice(None, None, st) for st in step)
    indexing_strides = data[slices].strides
    win_indices_shape = ((np.array(data.shape) - window_shape) // step) + 1
    
    new_shape = tuple(list(win_indices_shape) + list(window_shape))
    strides = tuple(list(indexing_strides) + list(data.strides))
    
    window_data = np.lib.stride_tricks.as_strided(data, shape=new_shape, strides=strides)
    
    return np.squeeze(window_data, 1)

In [6]:
df = spark.read.csv(f"hdfs://localhost:9000/solar_data/interim/Arkansas_interim.csv", inferSchema=True, header=True)

In [14]:
#Make sure that the dataframe is sorted by timestamp
df = df.sort(F.col('timestamp').asc())

In [21]:
cols = df.columns
cols.remove('Year')
cols.remove('Month')
cols.remove('Day')
cols.remove('Hour')
cols.remove('Minute')
cols.remove('timestamp')
cols.remove('GHI')

Xs = df.select(cols).toPandas().as_matrix()
ys = df.select('GHI').toPandas().as_matrix()

In [43]:
### We want a 48-step ahead forecast (corresponding to one day) with a 48,48*2,48*3-step look-back (corresponding to one/two/three days)
look_back = 48
look_ahead = 48

X_seq = create_windows(Xs, window_shape = look_back, end_id= -look_ahead, step = 48)
y_seq = create_windows(ys, window_shape = look_ahead, start_id= look_back, step = 48)


In [39]:
X_seq[0][-1]

array([7.000e+00, 1.100e+01, 0.000e+00, 4.200e+00, 1.140e-01, 1.200e+01,
       9.565e+01, 1.787e+02])

In [40]:
X_seq[1][0]

array([4.0000e+00, 1.0000e+00, 0.0000e+00, 2.7000e+00, 1.1400e-01,
       5.0000e+00, 1.0146e+02, 1.7980e+02])

In [41]:
len(X_seq)

7298

In [42]:
df.groupby(['Day', 'Month', 'Year']).count().count()
#This makes sense since we lose one day by looking ahead.

7300

In [47]:
%cd /home/dyllanjr/Solar_Irradiance_Prediction/data/processed/

#I'll save each of them as a .npy file, and paralellize upon loading
names = ['Arkansas', 'Arizona', 'Georgia']
look_backs = [48, 48*2, 48*3]
look_ahead = 48

for name in names:
    for look_back in look_backs:
        df = spark.read.csv(f"hdfs://localhost:9000/solar_data/interim/{name}_interim.csv", inferSchema=True, header=True)
        #Make sure that the dataframe is sorted by timestamp
        df = df.sort(F.col('timestamp').asc())
        cols = df.columns
        cols.remove('Year')
        cols.remove('Month')
        cols.remove('Day')
        cols.remove('Hour')
        cols.remove('Minute')
        cols.remove('timestamp')
        cols.remove('GHI')

        Xs = df.select(cols).toPandas().as_matrix()
        ys = df.select('GHI').toPandas().as_matrix()
        
        X_seq = create_windows(Xs, window_shape = look_back, end_id= -look_ahead, step = 48)
        y_seq = create_windows(ys, window_shape = look_ahead, start_id= look_back, step = 48)
        
        np.savez(f'{name}_look_back_{look_back}', x=X_seq, y=y_seq)

/home/dyllanjr/Solar_Irradiance_Prediction/data/processed


In [48]:
import os
os.system('hadoop fs -copyFromLocal /home/dyllanjr/Solar_Irradiance_Prediction/data/ /solar_data')

0