In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import HashingTF, Tokenizer, VectorAssembler
import pyspark.sql.functions as F
from pyspark.sql.window import Window
import matplotlib.pyplot as plt

In [None]:
import os
pwd = os.getcwd()
exploration_abs_path = f"{pwd}/exploration.ipynb"
exploration_abs_path

In [None]:
%run {pwd + "/exploration.ipynb"}

In [None]:
spark = create_spark_session("Spark_Application_Name")

# load data
all_datasets = []
for f in ['AMAZON.csv', 'APPLE.csv', 'FACEBOOK.csv', 'GOOGLE.csv',
            'MICROSOFT.csv', 'TESLA.csv', 'ZOOM.csv']:
    print(f"{f}:")
    df = load_data(spark, 'stocks_data/' + f)
    all_datasets.append(df)

Input:  
    - 4xHigh  
    - 4xOpen  
    - 4xClose  
    - 4xLow  
    - RSI (not implemented)  
    - Volatilité  
    - Momentum  
    - Upside/Downside (not implemented)  
    - IMI  
    - MFI  
  
  
régression polynomiale  
régression quantile  
réseau de neurones récurrent  

In [None]:
# all the features given to the model
features = [
    "High", "High1", "High2", "High3",
    "Low", "Low1", "Low2", "Low3",
    "Open", "Open1", "Open2", "Open3",
    "Close", "Close1", "Close2", "Close3",
    "Momentum",
    "imi",
    "Volatility"
]

In [None]:
def generate_train_test_data(df):
    # create a window to get lagged columns
    win = Window.partitionBy('Year').orderBy('Date')

    # create lagged columns (High, Low, Open and Close offseted by up to four)
    df_with_lag = df.withColumn('Year', F.year('Date'))
    for i in range(1, 4):
        for s in ["High", "Low", "Open", "Close"]:
            df_with_lag = df_with_lag.withColumn(s + str(i), F.lag(F.col(s), i).over(win))
    
    s = 'Date'
    i = 3
    df_with_lag = df_with_lag.withColumn(s + str(i), F.lag(F.col(s), i).over(win))

    # Calculate volatility over the last four days
    tmp = df_with_lag.withColumn('Average_from_1_to_4', (F.col('Close') + F.col('Close1') + F.col('Close2') + F.col('Close3')) / 4)
    tmp = tmp.withColumn('Close_minus_mean_square', (F.col('Close') - F.col('Average_from_1_to_4'))**2)
    tmp = tmp.withColumn('Close1_minus_mean_square', (F.col('Close1') - F.col('Average_from_1_to_4'))**2)
    tmp = tmp.withColumn('Close2_minus_mean_square', (F.col('Close2') - F.col('Average_from_1_to_4'))**2)
    tmp = tmp.withColumn('Close3_minus_mean_square', (F.col('Close3') - F.col('Average_from_1_to_4'))**2)
    tmp = tmp.withColumn('mean_minus_square', (F.col('Close_minus_mean_square') + F.col('Close1_minus_mean_square') + F.col('Close2_minus_mean_square') + F.col('Close3_minus_mean_square')) / 4)
    tmp = tmp.withColumn('Volatility', F.sqrt('mean_minus_square'))

    df_with_volatility = tmp.select(df_with_lag.columns + ['Volatility'])

    # add momentum to the dataframe
    df_with_momentum = compute_momentum(df_with_volatility).withColumnRenamed('daily_average', 'Momentum')

    # add imi to the dataframe
    df_with_imi = compute_intraday_momentum_index(df_with_momentum).select(df_with_momentum.columns + ['imi'])

    # add mfi to the dataframe
    df_with_mfi = compute_money_flow_index(df_with_imi).select(df_with_imi.columns + ['money_flow_index'])

    # add label (the next Close value)
    df_labeled = df_with_mfi.withColumn('label', F.lag(F.col('Close'), -1).over(win))

    # drop columns containing null values
    final_df = df_labeled.select(features + ['Date', 'label']).fillna(0)
    # final_df.show()

    # split train and test data
    trainDF, testDF = final_df.randomSplit([.8, .2], seed=42)

    return trainDF, testDF

In [None]:
def test_model(model_type, trainDF, testDF):
    # vectorise features
    assembler = VectorAssembler(
        inputCols=features,
        outputCol="features"
    )

    # create model type
    lr = model_type

    # create the pipeline that will create the model
    pipeline = Pipeline(stages=[assembler, lr])

    # Fit the pipeline to training documents.
    model = pipeline.fit(trainDF)

    # test the model
    predDF = model.transform(testDF)

    return predDF

In [None]:
def create_chart(predDF):
    """
    Create a plot comparing the label and the prediction
    """
    predDF = predDF.orderBy('Date')
    expected = [val.label for val in predDF.select('label').collect()]
    output = [val.prediction for val in predDF.select('prediction').collect()]
    x = [val.Date for val in predDF.select('Date').collect()]

    plt.plot(x, expected, color='red')
    plt.plot(x, output, color='blue')

    plt.ylabel('Price')
    plt.xlabel('timestamp')
    plt.title('ASN values for time')
    plt.legend(['expected', 'output'], loc='upper left')

    plt.show()

In [None]:
for dataframe in all_datasets:
    trainDF, testDF = generate_train_test_data(dataframe)
    for model_type in [LinearRegression()]:
        predDF = test_model(model_type, trainDF, testDF)
        create_chart(predDF)