In [1]:
#Version 5.0
#Decision Tree and Random Forest
from pyspark.sql import SQLContext, Window
from pyspark.sql.functions import *
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor, DecisionTreeRegressor, LinearRegression
from pyspark.ml.linalg import Vectors
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import abs, sqrt
from pyspark.sql import SparkSession
from pyspark import SparkContext

def sMAPE(df, prediction = "prediction", label = "label"):
    '''
    The function that calculates sMape between prediction and label.

    Args:
        df (dataframe): The dataframe which contains the prediction and label.
        prediction (str): The prediction column's name in strings.
        label (str): The label column's name in strings.

    Returns:
        sMAPE (float): Returns sMAPE value (%) in float.
    '''
    from  pyspark.sql.functions import abs
    df = df.select(prediction,label)
    df = df.withColumn('abs_diff',abs(df[prediction] - df[label]))\
           .withColumn('abs_prediction',abs(df[prediction]))\
           .withColumn('abs_label',abs(df[label]))
    df = df.withColumn('pre-sum_smape',df['abs_diff']/((df['abs_prediction']+df['abs_label'])/2))
    sum_smape = df.agg({'pre-sum_smape':'sum'}).collect()[0][0]
    sMAPE = 100*sum_smape/df.count()
    return sMAPE

def add_feature(df, new_df_name, colname, rename):
    '''
    The function adds a new feature column to a df.

    Args:
        df (dataframe): The original dataframe.
        new_df_name (str): The name of the new df.
        colname (str): The new feature col's name in the new df.
        rename (str): What you want the name to be after it is added to your original df.

    Returns:
        df (dataframe): Returns df with 1 additional col.
    '''
    from pyspark.sql import SQLContext
    new = sqlContext.sql('SELECT * FROM {0}'.format(new_df_name)).dropna()
    new = new.select("Date","{0}".format(colname))
    new = new.withColumnRenamed("{0}".format(colname),"{0}".format(rename)).withColumnRenamed("Date","new_Date")
    df = df.join(new,df["Date"] == new["new_Date"]).drop("new_Date")
    
    return df
  
def target_creation(df, h, Date = "Date",Close = "Close"):
    w = Window.orderBy(Date)
    df = df.withColumn('Lead{0}'.format(h), lead(Close, h).over(w))
    df = df.withColumn('target_Date', lead(Date, h).over(w))
    df = df.withColumn("diff{0}".format(h), df['Lead{0}'.format(h)] -df["Close"])
    
    return df
  
def difference_maker(df, feature_name, h = 1):
    w = Window.orderBy("Date")
    df = df.withColumn("Lag", lag(feature_name,h).over(w))
    df = df.withColumn("temp_name", df[feature_name] -df["Lag"]).drop("Lag").drop(feature_name)
    df = df.withColumnRenamed("temp_name", feature_name)
    
    return df
  
def difference_maker2(df, feature_name, h = 1):
    w = Window.orderBy("Date")
    df = df.withColumn("Lag1", lag(feature_name,1*h).over(w))
    df = df.withColumn("Lag2", lag(feature_name,2*h).over(w))
    df = df.withColumn("temp_name", df["Lag1"] -df["Lag2"]).drop("Lag1").drop("Lag2")
    df = df.withColumnRenamed("temp_name", feature_name+"2")
    
    return df

def feature_assembler(df, feature_list, output_name="features"):
    from pyspark.ml.feature import VectorAssembler
    V_assem = VectorAssembler(inputCols = feature_list, outputCol = output_name)
    df = V_assem.transform(df)
    return df

def standard_scaler(df, input_name="features", output_name="s_features"):
    from pyspark.ml.feature import StandardScaler
    scaler = StandardScaler(inputCol=input_name, outputCol=output_name, withStd=True, withMean=False)
    df = scaler.fit(df).transform(df)
    return df

def pca(df, input_name="features", output_name="pca_features", keep = 10):
    from pyspark.ml.feature import PCA
    pca = PCA(k=keep, inputCol=input_name, outputCol=output_name)
    df = pca.fit(df).transform(df)
    return df

def df_col_selecter(df, col_list = ["Date","target_date","Close","pca_features"]):
    df = df.select([col for col in col_list])
    return df
  
# 'pcaFeatures','Date','target_Date','Close','diff{0}'.format(h),'Lead{0}'.format(h)

def correlation_checker(df, feature_col):
    from pyspark.ml.stat import Correlation
    r1 = Correlation.corr(df, feature_col).head()
    print("Pearson correlation matrix:\n" + str(r1[0]))


def data_split(df, train_size = 0.6 , val_size = 0.2, test_size = 0.2):
    w = Window.orderBy("Date")
    df = df.withColumn('rank', percent_rank().over(w))
    training = df.where('rank <= {0}'.format(train_size)).drop('rank')
    val = df.where('rank > {0} AND rank <= {1}'.format(train_size, (train_size+val_size))).drop('rank')
    test = df.where('rank > {0}'.format(1-test_size)).drop('rank')
    return training, val, test

def random_forest(training, test, feature_list, features = "features", label = "label", h = 1, depth = 2, bins = 100, numtree = 100, print_result = False):
    rf = RandomForestRegressor(featuresCol = features, labelCol = label, maxDepth = depth, maxBins = bins, numTrees = numtree, seed = 42)
    model = rf.fit(training)
    prediction = model.transform(test)
    prediction = prediction.withColumn('pred', prediction['Close'] + prediction['prediction'])
#     prediction = prediction.withColumnRenamed("prediction","pred")
  
    rmse_evaluator = RegressionEvaluator(labelCol="Lead{0}".format(h), predictionCol="pred", metricName="rmse")
    r2_evaluator = RegressionEvaluator(labelCol="Lead{0}".format(h), predictionCol="pred", metricName="r2")
    smape = sMAPE(prediction,"pred","Lead{0}".format(h))
    dsmape = sMAPE(prediction,"Close","Lead{0}".format(h))
#     csmape = sMAPE(prediction,"Close","pred".format(h))
    if print_result:
        print("Random Forest to forecast {0} day into future:".format(h))
        print("RMSE is", rmse_evaluator.evaluate(prediction))
        print("r2 is", r2_evaluator.evaluate(prediction))
        print("sMAPE is: ", smape, "default smape:", dsmape)
        importance_list = model.featureImportances
        feature_importance = dict(zip(feature_list, importance_list))
#         sorted_feature_importance = sorted(feature_importance, key=feature_importance.get, reverse=True)
        sorted_feature_importance = sorted(feature_importance.items(), key=lambda x:x[1], reverse=True)
        print(sorted_feature_importance)
        
    return smape, prediction
  
def decision_tree(training, test, feature_list, features = "features", label = "label", h = 1, depth = 2, bins = 100, numtree = 100, print_result = False):
  rf = DecisionTreeRegressor(featuresCol= features, labelCol= label, maxDepth = depth, maxBins = bins)
  model = rf.fit(training)
  prediction = model.transform(test)
  prediction = prediction.withColumn('pred', prediction['Close'] + prediction['prediction'])
#     prediction = prediction.withColumnRenamed("prediction","pred")

  rmse_evaluator = RegressionEvaluator(labelCol="Lead{0}".format(h), predictionCol="pred", metricName="rmse")
  r2_evaluator = RegressionEvaluator(labelCol="Lead{0}".format(h), predictionCol="pred", metricName="r2")
  smape = sMAPE(prediction,"pred","Lead{0}".format(h))
  dsmape = sMAPE(prediction,"Close","Lead{0}".format(h))
#     csmape = sMAPE(prediction,"Close","pred".format(h))
  if print_result:
      print("Random Forest to forecast {0} day into future:".format(h))
      print("RMSE is", rmse_evaluator.evaluate(prediction))
      print("r2 is", r2_evaluator.evaluate(prediction))
      print("sMAPE is: ", smape, "default smape:", dsmape)
      importance_list = model.featureImportances
      feature_importance = dict(zip(feature_list, importance_list))
#         sorted_feature_importance = sorted(feature_importance, key=feature_importance.get, reverse=True)
      sorted_feature_importance = sorted(feature_importance.items(), key=lambda x:x[1], reverse=True)
      print(sorted_feature_importance)

  return smape, prediction

def print_df(df,num = 10):
    print("Date ascending:")
    df.show(num)
    print("Date descending:")
    df.orderBy(desc("Date")).show(num)
    
def plot_graph(sdf, pred = 'pred', target = 'Lead84', target_Date = "target_Date"):
    import numpy as np
    import pandas as pd
    import matplotlib.pyplot as plt
    df = sdf.toPandas()
    date = np.array(df[target_Date])
    y = np.array(df[target])
    pred_y = np.array(df[pred])
    fig = plt.figure(figsize=(15, 5))
    plt.plot(date, y ,marker='.',linestyle = '-',label = "Actual Close Price")
    plt.plot(date, pred_y, marker='.',linestyle = '--',label = "Predicted Close Price")
    plt.title('Close Price vs. Date for {0}'.format(target))
    plt.xlabel('Date')
    plt.ylabel('Price($)')
    plt.legend()
    display(fig.figure)
    
def hyper_para_tester( h, training, val, feature_list, print_result = False):
    last_smape = 100
    for n in [2,3,4,5]:
      depth = n
      for m in [10,50,100,150,200]:
        numtree = m
        for b in [90,100,110]:
          bins = b
          smape , _ = random_forest(training, val, feature_list, "s_features", "diff{0}".format(h), h, depth, bins, numtree, print_result = False)
          if smape < last_smape:
            best_smape = smape
            best_bins = bins
            best_depth = depth
            best_numtree = numtree
            last_smape = smape
    if print_result:
      print("best_smape:",best_smape,"\n",\
            "\n","best_bins:",best_bins,"\n","best_depth:",best_depth,"\n","best_numtree:",best_numtree)
    return best_bins, best_depth, best_numtree

In [2]:
h = 84

df = sqlContext.sql('SELECT * FROM all_feature').dropna()

df = target_creation(df, h, Date = "Date", Close = "Close")
df = df.withColumn("Close_lag",df["Close"])

feature_list = ["Close_lag","sp500","nasdaq100","DoeJonesIA","USDtoCNY","USDtoEUR","USDtoJPY","AnalogDevice","Jabil","Microsoft","Nidec","Qualcomm","TSMC","CPI","USA MSCI","will5000"]

for feature in feature_list:
  df = difference_maker(df, feature_name = feature, h = h)


df = df.dropna()
df = feature_assembler(df, feature_list, output_name="features")

df = standard_scaler(df, input_name="features", output_name="s_features")
# df = pca(df, input_name="s_features", output_name="pca_features", keep = 10)


col_list = ["Date","Close","s_features","diff{0}".format(h),"target_date","Lead{0}".format(h)]
df = df_col_selecter(df, col_list)


# correlation_checker(df, "s_features")

training, val, test = data_split(df, train_size = 0.6 , val_size = 0.2, test_size = 0.2)

# best_bins, best_depth, best_numtree = hyper_para_tester( h, training, val, feature_list, print_result = True)

best_bins = 110 
best_depth = 2 
best_numtree = 10

In [3]:
training, val, test = data_split(df, train_size = 0.6 , val_size = 0.2, test_size = 0.2)
smape84, pred84 = decision_tree(training, test, feature_list, features = "s_features", label = "diff{0}".format(h), h = h , depth = best_depth, bins = best_bins, numtree = best_numtree, print_result = True)

In [4]:
plot_graph(pred84, pred = 'pred', target = 'Lead{0}'.format(h), target_Date = "target_date")

In [5]:
training, val, test = data_split(df, train_size = 0.6 , val_size = 0.2, test_size = 0.2)
smape84, pred84 = random_forest(training, test, feature_list, features = "s_features", label = "diff{0}".format(h), h = h , depth = best_depth, bins = best_bins, numtree = best_numtree, print_result = True)


In [6]:
plot_graph(pred84, pred = 'pred', target = 'Lead{0}'.format(h), target_Date = "target_date")

In [7]:
h = 21

df = sqlContext.sql('SELECT * FROM all_feature').dropna()

df = target_creation(df, h, Date = "Date", Close = "Close")
df = df.withColumn("Close_lag",df["Close"])

feature_list = ["Close_lag","sp500","nasdaq100","DoeJonesIA","USDtoCNY","USDtoEUR","USDtoJPY","AnalogDevice","Jabil","Microsoft","Nidec","Qualcomm","TSMC","CPI","USA MSCI","will5000"]

for feature in feature_list:
  df = difference_maker(df, feature_name = feature, h = h)


df = df.dropna()
df = feature_assembler(df, feature_list, output_name="features")

df = standard_scaler(df, input_name="features", output_name="s_features")
# df = pca(df, input_name="s_features", output_name="pca_features", keep = 10)


col_list = ["Date","Close","s_features","diff{0}".format(h),"target_date","Lead{0}".format(h)]
df = df_col_selecter(df, col_list)


# correlation_checker(df, "s_features")

training, val, test = data_split(df, train_size = 0.6 , val_size = 0.2, test_size = 0.2)

# best_bins, best_depth, best_numtree = hyper_para_tester( h, training, val, feature_list, print_result = True)
best_bins = 90 
best_depth = 5 
best_numtree = 150


In [8]:
smape21, pred21 = decision_tree(training, val, feature_list, features = "s_features", label = "diff{0}".format(h), h = h , depth = best_depth, bins = best_bins, numtree = best_numtree, print_result = True)

In [9]:
plot_graph(pred21, pred = 'pred', target = 'Lead{0}'.format(h), target_Date = "target_date")

In [10]:
smape21, pred21 = random_forest(training, val, feature_list, features = "s_features", label = "diff{0}".format(h), h = h , depth = best_depth, bins = best_bins, numtree = best_numtree, print_result = True)

In [11]:
plot_graph(pred21, pred = 'pred', target = 'Lead{0}'.format(h), target_Date = "target_date")

In [12]:
h = 10

df = sqlContext.sql('SELECT * FROM all_feature').dropna()

df = target_creation(df, h, Date = "Date", Close = "Close")
df = df.withColumn("Close_lag",df["Close"])

feature_list = ["Close_lag","sp500","nasdaq100","DoeJonesIA","USDtoCNY","USDtoEUR","USDtoJPY","AnalogDevice","Jabil","Microsoft","Nidec","Qualcomm","TSMC","CPI","USA MSCI","will5000"]

for feature in feature_list:
  df = difference_maker(df, feature_name = feature, h = h)


df = df.dropna()
df = feature_assembler(df, feature_list, output_name="features")

df = standard_scaler(df, input_name="features", output_name="s_features")
# df = pca(df, input_name="s_features", output_name="pca_features", keep = 10)


col_list = ["Date","Close","s_features","diff{0}".format(h),"target_date","Lead{0}".format(h)]
df = df_col_selecter(df, col_list)


# correlation_checker(df, "s_features")

training, val, test = data_split(df, train_size = 0.6 , val_size = 0.2, test_size = 0.2)

# best_bins, best_depth, best_numtree = hyper_para_tester( h, training, val, feature_list, print_result = True)
best_bins = 90 
best_depth = 3 
best_numtree = 10

In [13]:
smape10, pred10 = decision_tree(training, val, feature_list, features = "s_features", label = "diff{0}".format(h), h = h , depth = best_depth, bins = best_bins, numtree = best_numtree, print_result = True)

In [14]:
plot_graph(pred10, pred = 'pred', target = 'Lead{0}'.format(h), target_Date = "target_date")

In [15]:
smape10, pred10 = random_forest(training, val, feature_list, features = "s_features", label = "diff{0}".format(h), h = h , depth = best_depth, bins = best_bins, numtree = best_numtree, print_result = True)

In [16]:
plot_graph(pred10, pred = 'pred', target = 'Lead{0}'.format(h), target_Date = "target_date")