# Feature Engineering

In [None]:
from pyspark.sql import functions as F
from sklearn.preprocessing import RobustScaler
sc_f = RobustScaler()
sc_y = RobustScaler()

def add_price_returns(df, x):
    '''
    Parameters: Pyspark df with "Close" price column, x minute parameter
    Returns: Pyspark df with added x price_xmin_return column
    '''
    df_return = df.withColumn('tmp_lag_price', lag(df.Close, count=x).over(Window.orderBy('Timestamp')))
    col_name = 'price_' + str(x) + 'min_return'
    df_return = df_return.withColumn(col_name, df_return.Close - df_return.tmp_lag_price).na.drop()
    return df_return.drop('tmp_lag_price')

def get_dates_times_subset(df, startYear, endYear, st, et):
    '''
    BE SURE TO INCLUDE ALL NEEDED DATA IN TIMESPAN!  
    Building forecast_window and prior time_window from this data should be considered.
    Parameters: Pyspark df with Timestamp column, startYear(YYYY), endYear(YYYY), start time (st), end time (et)
    Returns: Pyspark df between specified dates, specific times
    '''
    df_return = df.filter((year('Timestamp') >= lit(startYear)) & (year('Timestamp') <= lit(endYear)))\
                  .filter((hour('Timestamp') >= lit(st.hour)) & (hour('Timestamp') <= lit(et.hour)))\
                  .filter((hour('Timestamp') != lit(st.hour)) | (minute('Timestamp') >= lit(st.minute)))\
                  .filter((hour('Timestamp') != lit(et.hour)) | (minute('Timestamp') <= lit(et.minute)))
    return df_return

def add_dataset_check_col(df, startTime, endTime):
    '''
    Parameters: Pyspark df with Timestamp column, startTime(HH:MM), endTime(HH:MM)
    Returns: Pyspark df with "is_in_dataset" column, identifying rows that stay in dataset after final setup
    '''
    st = datetime.datetime.strptime(startTime, '%H:%M')
    et = datetime.datetime.strptime(endTime, "%H:%M")
    df_return = df.withColumn('is_in_dataset', ((hour('Timestamp') == lit(st.hour)) & (minute('Timestamp') >= lit(st.minute))) | \
                                        ((hour('Timestamp') == lit(et.hour)) & (minute('Timestamp') <= lit(et.minute))) | \
                                        ((hour('Timestamp') >= lit(min(st.hour + 1, et.hour))) & \
                                         (hour('Timestamp') <= lit(max(et.hour - 1, st.hour)))))
    return df_return

def get_arrays_for_lstm(df, time_window, forecast_window, batch_size):
    '''
    Assumes cols 5, 6 are price, volume features, 7 will be y var, and last col is datacheck bool
    Parameters: Pyspark df, time_window, forecast_window, batch_size
    Returns: Transformed 3D X and y np arrays for LSTM network model
    '''
    pdf = df.toPandas()
    
    # only need price returns and volumes, but keep dataset check vals for below
    feature_set = pdf.iloc[:, 5:7].values
    y_set = pdf.iloc[:, 7:8].values
    is_in_dataset_check = pdf.iloc[:, -1]

    # feature scaling
    feature_set_scaled = sc_f.fit_transform(np.float64(feature_set))
    y_set_scaled = sc_y.fit_transform(np.float64(y_set))

    # filter data into needed arrays
    x_price_train = []
    x_volume_train = []
    y_train = []

    length = len(feature_set_scaled)
    for i in range(0, length):
        x_volume_train.append(feature_set_scaled[max(0, i - time_window):i, 0])
        x_price_train.append(feature_set_scaled[max(0, i - time_window):i, 1])

    for i in range(0, len(y_set_scaled)):
        y_train.append(y_set_scaled[max(0, i-forecast_window):max(0, i-forecast_window)+1, 0])

    # now that we have the time_window data, remove unwanted entries based on prior is_in_dataset_check
    x_volume_train, x_price_train, y_train, is_in_dataset_check = \
        np.array(x_volume_train), np.array(x_price_train), np.array(y_train), np.array(is_in_dataset_check)
    x_volume_train = x_volume_train[is_in_dataset_check]
    x_price_train = x_price_train[is_in_dataset_check]
    y_train = y_train[is_in_dataset_check]

    # reduce size of dataset to be divisible by batch size
    x_volume_train = x_volume_train[0:len(x_volume_train) - len(x_volume_train)%batch_size]
    x_price_train = x_price_train[0:len(x_price_train) - len(x_price_train)%batch_size]
    y_train = y_train[0:len(y_train) - len(y_train)%batch_size]

    # combine and reshape for modeling
    x_volume_train = np.reshape(np.array(x_volume_train.tolist()), (x_volume_train.shape[0], 10))
    x_price_train = np.reshape(np.array(x_price_train.tolist()), (x_price_train.shape[0], 10))
    X_train = np.dstack((x_price_train, x_volume_train))
    y_train = np.reshape(np.array(y_train.tolist()), (y_train.shape[0], 1))
    print("Feature set shape (standardized price & volume w/10min window): ")
    print(X_train.shape)
    print(X_train[0])
    print('\n')
    print("y var shape (standardized 5min future price return): ")
    print(y_train.shape)
    print(y_train[0])
    print('\n')
    return X_train, y_train

def get_mlr_df(df, time_window, forecast_window):
    '''
    DOES NOT STANDARDIZE VARIABLES!
    Parameters: Pyspark df (with Timestamp, price_1min_return, Volume, and price_5min_return, is_in_dataset cols), 
                time_window, forecast_window
    Returns: Pyspark df with cols "sma_price", "sma_volume", "y_price"
    '''
    # add sma cols for price returns and volume (features)
    df_return = df.withColumn('sma_price', F.avg("price_1min_return")\
                              .over(Window.orderBy('Timestamp').rowsBetween(-time_window, 0)))
    df_return = df_return.withColumn('sma_volume', F.avg("Volume")\
                                     .over(Window.orderBy('Timestamp').rowsBetween(-time_window, 0)))
    
    # shift 5 min return col to line up y var
    df_return = df_return.withColumn('label', \
                                     lag(df.price_5min_return, count=-forecast_window).over(Window.orderBy('Timestamp')))

    # now that we have the time_window data, remove unwanted entries based on prior is_in_dataset_check
    df_return = df_return.filter(df.is_in_dataset)

    return df_return.drop('Open', 'High', 'Low', 'Close', 'Volume', 'price_1min_return', 'price_5min_return', 'is_in_dataset')

In [None]:
# define important data parameters
forecast_window = 5
time_window = 10
batch_size = 64
start_time = '09:15'
end_time = '11:15'

# add price returns columns
df = add_price_returns(df, 1)
df = add_price_returns(df, 5)

# set train data to year 2013-2015 (val data to 2016, test data to 2017)
# include only data between needed time windows (start to end time, plus time, forecast windows added accordingly)
st = datetime.datetime.strptime(start_time, '%H:%M') - datetime.timedelta(minutes = time_window)
et = datetime.datetime.strptime(end_time, '%H:%M') + datetime.timedelta(minutes = forecast_window)
df_train = get_dates_times_subset(df, 2013, 2015, st, et)
df_val   = get_dates_times_subset(df, 2016, 2016, st, et)
df_test  = get_dates_times_subset(df, 2017, 2017, st, et)

# add a column to later filter out fields not needed in final dataset.
df_train = add_dataset_check_col(df_train, start_time, end_time)
df_val   = add_dataset_check_col(df_val, start_time, end_time)
df_test  = add_dataset_check_col(df_test, start_time, end_time)

df_train.show(10, False)

In [None]:
# get arrays for lstm network model
X_train, y_train = get_arrays_for_lstm(df_train, time_window, forecast_window, batch_size)
X_test, y_test   = get_arrays_for_lstm(df_test, time_window, forecast_window, batch_size)
X_val, y_val     = get_arrays_for_lstm(df_val, time_window, forecast_window, batch_size)

In [None]:
# these will be kept in a pySpark dataframe so we can later show off a Pipeline assembly in Spark
mlr_train = get_mlr_df(df_train, time_window, forecast_window)
mlr_test  = get_mlr_df(df_test, time_window, forecast_window)

mlr_train.show(10, False)