# Project BIG_DATA

Student Name : Guillaume Carrière 23458, Youri Colera 23460

In [8]:
from pyspark.sql import SparkSession
import sys
from pyspark.sql.window import Window
from pyspark.sql.types import *
import pyspark.sql.functions as F
import numbers

spark_application_name = "Spark_Application_Name"
spark = (SparkSession.builder.appName(spark_application_name).getOrCreate())

## First step : exploration

In this cell we define every function that gives us information from the raw data.

read_file : reads a csv file and gives back a dataframe

read_files : helper function to read all available files and concatenate all of them in a dataframe

show_stock : shows the first 40 and last 40 rows of a company stock

get_size : shows the number of rows of a dataframe

describe_stock : show the different statistics of every column of a company stock

nb_missing_values_stock : count number of missing values of every column of a company stock

In [9]:
devColumns = [StructField("Date",DateType()), StructField("High",FloatType()), StructField("Low",FloatType()), StructField("Open",FloatType()), StructField("Close",FloatType()), StructField("Volume",LongType()), StructField("Adj Close",FloatType()), StructField("company_name",StringType())]
devSchema = StructType(devColumns)

def read_file(file_path, header = True, delimiter = ',') :
    return spark.read.schema(devSchema).option("delimiter", delimiter).csv(file_path, header=header)

def read_files() :
    list_stock = []
    list_stock.append(read_file("stocks_data/AMAZON.csv"))
    list_stock.append(read_file("stocks_data/APPLE.csv"))
    list_stock.append(read_file("stocks_data/FACEBOOK.csv"))
    list_stock.append(read_file("stocks_data/GOOGLE.csv"))
    list_stock.append(read_file("stocks_data/MICROSOFT.csv"))
    list_stock.append(read_file("stocks_data/TESLA.csv"))
    list_stock.append(read_file("stocks_data/ZOOM.csv"))
    
    df = list_stock[0]
    
    for i in range (1, len(list_stock)) :
        df = df.union(list_stock[i])
    return df.sort("company_name")

def show_stock(df, company_name) :
    stock = df.filter(F.col("company_name") == company_name)
    stock.show(40)
    s = stock.count()
    stock.withColumn("rn",F.row_number().over(Window.orderBy("Date"))).where(F.col("rn")>s-40).drop("rn").show(40)

    
def get_size(df) :
    return df.count()

def describe_stock(df, company_name) :
    df.filter(F.col("company_name") == company_name).describe().show()

def nb_missing_values_stock(df, company_name) :
    df.filter(F.col("company_name") == company_name).select([F.count(F.when(F.isnan(c) | F.isnull(c), c)).alias(c) for c in df.columns[1:-1]]).show()
    

In this cell we define every function that creates additional information that can be useful when doing analysis

moving_average : computes moving average of a column on the last nb_points for every company. nb_points relate to the Date. If nb_points = 7 the moving average is computed on the last week, this means if there are missing data points for unavailable dates, the function will adapt and compute the moving average on the remaining datapoints in order to not give false data

compute_daily_return : computes the daily return on every data point of the dataframe

compute_differences : computes the daily and monthly difference on every data point of every company. If the data point from the previous day or month is unavailable, the difference will be null.

compute_return : compute the return rate for last week, month and year. If the data point is unavailable, the return rate will be null

In [10]:
days = lambda x : x * 86400 

def moving_average(df, columnName, new_columnName, nb_points) :
    
    if columnName not in df.columns :
        print("Column not found")
    else :
        w = Window.partitionBy(F.col("company_name")).orderBy(F.col("Date").cast("timestamp").cast("long")).rangeBetween(-days(nb_points - 1), 0)
        return df.withColumn(new_columnName,F.avg(columnName).over(w))

def compute_daily_return(df) :
    return df.withColumn("Daily Return", ((df.Close - df.Open)/df.Open) * 100)

def compute_differences(df) :
    w = Window.partitionBy(F.col("company_name")).orderBy(F.col("Date").cast("timestamp").cast("long")).rangeBetween(-days(1), -days(1))
    df = df.withColumn("day_before",F.avg("Adj Close").over(w))
    df = df.withColumn("Daily Difference", df["Adj Close"] - df["day_before"]).drop("day_before")
    
    w = Window.partitionBy(F.col("company_name")).orderBy(F.col("Date").cast("timestamp").cast("long")).rangeBetween(-days(30), -days(30))
    df = df.withColumn("month_before",F.avg("Adj Close").over(w))
    return df.withColumn("Monthly Difference", df["Adj Close"] - df["month_before"]).drop("month_before")


def compute_return(df) :
    w = Window.partitionBy(F.col("company_name")).orderBy(F.col("Date").cast("timestamp").cast("long")).rangeBetween(-days(7), -days(7))
    df = df.withColumn("week_before",F.avg("Adj Close").over(w))
    df = df.withColumn("Weekly Return", ((df["Adj Close"] - df["week_before"])/df["week_before"]) * 100).drop("week_before")
    
    w = Window.partitionBy(F.col("company_name")).orderBy(F.col("Date").cast("timestamp").cast("long")).rangeBetween(-days(30), -days(30))
    df = df.withColumn("month_before",F.avg("Adj Close").over(w))
    df = df.withColumn("Monthly Return", ((df["Adj Close"] - df["month_before"])/df["month_before"]) * 100).drop("month_before")

    w = Window.partitionBy(F.col("company_name")).orderBy(F.col("Date").cast("timestamp").cast("long")).rangeBetween(-days(365), -days(365))
    df = df.withColumn("year_before",F.avg("Adj Close").over(w))
    return df.withColumn("Yearly Return", ((df["Adj Close"] - df["year_before"])/df["year_before"]) * 100).drop("year_before")


In this cell we define every function that helps us analyze the data. 

date_difference : computes the time interval between two data points

correlation_df : computes the correlation between two columns of a dataframe

correlate_stocks : computes the correlation between the same columns of different company stocks

best_return_rate : returns the company with the best return rate on a specific period and start date

In [11]:
def date_difference(dp1, dp2) :
    return abs(dp1["Date"] - dp2["Date"])

def correlation_df(df, col1, col2) :
    if col1 not in df.columns or col2 not in df.columns :
        print("Columns not found")
    else :
        return df.stat.corr(col1, col2)


def correlate_stocks(df, company_name1, company_name2) :
    df1 = df.filter(F.col("company_name") == company_name1)
    df2 = df.filter(F.col("company_name") == company_name2)
    
    print("Correlation between " + df1.first()["company_name"] + " and " + df2.first()["company_name"])
    corr_columns = []
    for column in df1.columns :
        columnType = dict(df1.dtypes)[column]
        if columnType == 'float' or columnType == 'long' :
            corr_columns.append(column)
    
    df2 = df2.select(*(F.col(x).alias(x + '_2') for x in df2.columns))
    jdf = df1.join(df2, df1.Date == df2.Date_2,how='inner')
    
    correlations = [corr_columns, []]
    for col in corr_columns :
        correlations[1].append(correlation_df(jdf, col, col + "_2"))
    return correlations


def best_return_rate(df, start_date, period) :
    w = Window.partitionBy(F.col("company_name")).orderBy(F.col("Date").cast("timestamp").cast("long")).rangeBetween(days(period), days(period))
    
    max_ret = -sys.maxsize - 1
    max_stock = None
    
    df_m = df.withColumn("period_after",F.avg("Adj Close").over(w))
    rows = df_m.withColumn("return", ((df_m["period_after"] - df_m["Adj Close"])/df_m["Adj Close"]) * 100).filter(F.col("Date") == start_date)
    if rows != None :
        for row in rows.rdd.collect() :
            ret = row["return"]
            if (ret != None and ret > max_ret) :
                max_ret = ret
                max_stock = row["company_name"]
            
    if (max_stock == None) :
        print("Data insufficient")
    else :
        print("Best return rate is " + str(max_ret) + " for the stock of " + max_stock)

    return max_stock

In this cell we define the main function that will use all the previously defined functions in order to perform the exploration process.

It will be composed of these steps :

- read all the .csv files and put them in a pyspark dataframe
- for every stock, print the company name, the time range of the data, the number of rows, the number of missing values, descriptive statistics on every variable and show the first 40 and last 40 rows
- for every stock :
    - compute the weekly, monthly and yearly moving average of the "Open" column
    - compute the weekly, monthly and yearly moving average of the "Close" column
    - compute the daily and monthly difference for the "Adj Close" column
    - compute the daily return rate
    - compute the weekly, monthly and yearly moving average of the "Daily Return" column
    - compute the weekly, monthly and yearly return rate
    - show the dataframe to see all the new columns
- for every pair of stocks, show the correlation between every pair of similar columns in the pair of stocks
- compute the best return rate of the stocks for a period of one year, at the starting date of 2019-12-02 (beginning of COVID-19 crisis)

In [12]:
def main() : 
    df = read_files()
    companies = ["AMAZON", "APPLE", "FACEBOOK", "GOOGLE", "MICROSOFT", "TESLA", "ZOOM"]
    
    for company_name in companies :
        stock = df.filter(F.col("company_name") == company_name)
        print("Time range of the stock : " + str(date_difference(stock.first(), stock.tail(1)[0])))
        print(company_name + " size : " + str(get_size(stock)))
        
        print("nb_missing_values :")
        nb_missing_values_stock(df, company_name)
        describe_stock(df, company_name)
        show_stock(df, company_name)
    
    
    df = moving_average(df, "Open", "Weekly Avg Open", 7)
    df = moving_average(df, "Open", "Monthly Avg Open", 30)
    df = moving_average(df, "Open", "Yearly Avg Open", 365)

    df = moving_average(df, "Close", "Weekly Avg Close", 7)
    df = moving_average(df, "Close", "Monthly Avg Close", 30)
    df = moving_average(df, "Close", "Yearly Avg Close", 365)

    df = compute_differences(df)

    df = compute_daily_return(df)

    df = moving_average(df, "Daily Return", "Weekly Avg Daily Return", 7)
    df = moving_average(df, "Daily Return", "Monthly Avg Daily Return", 30)
    df = moving_average(df, "Daily Return", "Yearly Avg Daily Return", 365)
        
    df = compute_return(df)
    
    for company_name in companies :
        df.filter(F.col("company_name") == company_name).show()

    for i in range (0, len(companies)) :
        for j in range (i, len(companies)) :
            stock = companies[i]
            stock2 = companies[j]
            if (stock != stock2) :
                print(correlate_stocks(df, stock, stock2))
                
    best_return_rate(df, "2019-12-02", 365)
    return df


In [13]:
df = main()

                                                                                

Time range of the stock : 1429 days, 0:00:00
AMAZON size : 987
nb_missing_values :
+----+---+----+-----+------+---------+
|High|Low|Open|Close|Volume|Adj Close|
+----+---+----+-----+------+---------+
|   0|  0|   0|    0|     0|        0|
+----+---+----+-----+------+---------+

+-------+------------------+------------------+-----------------+------------------+-----------------+------------------+------------+
|summary|              High|               Low|             Open|             Close|           Volume|         Adj Close|company_name|
+-------+------------------+------------------+-----------------+------------------+-----------------+------------------+------------+
|  count|               987|               987|              987|               987|              987|               987|         987|
|   mean|1762.0071216958152|1722.1011452099956|1743.433881363487|1742.9566644206718| 4509728.05775076|1742.9566644206718|        null|
| stddev| 667.2385315752688| 644.7988093382758

2022-06-20 16:04:22,662 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2022-06-20 16:04:22,668 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2022-06-20 16:04:22,971 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2022-06-20 16:04:23,032 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+----------+-------+-------+-------+---------+-------+---------+------------+
|      Date|   High|    Low|   Open|    Close| Volume|Adj Close|company_name|
+----------+-------+-------+-------+---------+-------+---------+------------+
|2020-10-07| 3200.0|3132.39| 3135.0|  3195.69|4309400|  3195.69|      AMAZON|
|2020-10-08|3233.29|3174.99|3224.99|  3190.55|3174100|  3190.55|      AMAZON|
|2020-10-09|3288.99|3197.83| 3210.0|  3286.65|4907900|  3286.65|      AMAZON|
|2020-10-12|3496.24|3339.55|3349.94|  3442.93|8364200|  3442.93|      AMAZON|
|2020-10-13|3492.38|3424.22|3467.99|  3443.63|5744700|  3443.63|      AMAZON|
|2020-10-14|3464.88| 3340.0| 3447.0|  3363.71|5828900|  3363.71|      AMAZON|
|2020-10-15|3355.88| 3280.0|3292.01|  3338.65|5223400|  3338.65|      AMAZON|
|2020-10-16|3399.66| 3160.0|3363.23|  3272.71|6474400|  3272.71|      AMAZON|
|2020-10-19| 3329.0|3192.74|3299.61|  3207.21|5223600|  3207.21|      AMAZON|
|2020-10-20| 3266.0|3192.01|3222.28|  3217.01|4509700|  3217.01|

2022-06-20 16:04:25,864 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2022-06-20 16:04:25,866 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2022-06-20 16:04:26,087 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2022-06-20 16:04:26,126 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+----------+------+------+------+--------+------+----------+------------+
|      Date|  High|   Low|  Open|   Close|Volume| Adj Close|company_name|
+----------+------+------+------+--------+------+----------+------------+
|2020-10-07|115.55|114.13|114.62|  115.08|  null|114.881805|       APPLE|
|2020-10-08| 116.4|114.59|116.25|  114.97|  null| 114.77199|       APPLE|
|2020-10-09| 117.0|114.92|115.28|  116.97|  null| 116.76855|       APPLE|
|2020-10-12|125.18|119.28|120.06|   124.4|  null| 124.18575|       APPLE|
|2020-10-13|125.39|119.65|125.27|   121.1|  null| 120.89143|       APPLE|
|2020-10-14|123.03|119.62| 121.0|  121.19|  null| 120.98128|       APPLE|
|2020-10-15| 121.2|118.15|118.72|  120.71|  null|120.502106|       APPLE|
|2020-10-16|121.55|118.81|121.28|  119.02|  null| 118.81501|       APPLE|
|2020-10-19|120.42|115.66|119.96|  115.98|  null| 115.78025|       APPLE|
|2020-10-20|118.98|115.63| 116.2|  117.51|  null| 117.30762|       APPLE|
|2020-10-21|118.71|116.45|116.67|  116

2022-06-20 16:04:28,380 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2022-06-20 16:04:28,381 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2022-06-20 16:04:28,593 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2022-06-20 16:04:28,625 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+----------+------+-------+------+------+--------+---------+------------+
|      Date|  High|    Low|  Open| Close|  Volume|Adj Close|company_name|
+----------+------+-------+------+------+--------+---------+------------+
|2020-10-07|260.18| 254.82|259.21|258.12|23133400|   258.12|    FACEBOOK|
|2020-10-08|264.62| 259.15|259.75|263.76|16312800|   263.76|    FACEBOOK|
|2020-10-09|264.75| 262.17|264.52|264.45|14107800|   264.45|    FACEBOOK|
|2020-10-12|280.18| 267.87| 270.2|275.75|31019300|   275.75|    FACEBOOK|
|2020-10-13| 279.1| 273.39|277.58|276.14|18063300|   276.14|    FACEBOOK|
|2020-10-14|278.75|  271.5|277.62|271.82|15601200|   271.82|    FACEBOOK|
|2020-10-15|269.04| 263.67| 267.6|266.72|15416100|   266.72|    FACEBOOK|
|2020-10-16|271.37|  265.3|267.38|265.93|16622700|   265.93|    FACEBOOK|
|2020-10-19|268.55| 259.88|265.53| 261.4|13587000|    261.4|    FACEBOOK|
|2020-10-20| 269.7| 262.88|263.06|267.56|18763200|   267.56|    FACEBOOK|
|2020-10-21|283.05| 276.37|279.56|278.

2022-06-20 16:04:30,810 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2022-06-20 16:04:30,811 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2022-06-20 16:04:31,028 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2022-06-20 16:04:31,061 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+----------+--------+---------+--------+-------+-------+---------+------------+
|      Date|    High|      Low|    Open|  Close| Volume|Adj Close|company_name|
+----------+--------+---------+--------+-------+-------+---------+------------+
|2020-10-07| 1468.96|   1436.0| 1464.29|1460.29|1746200|  1460.29|      GOOGLE|
|2020-10-08|  1490.0|  1465.09| 1465.09|1485.93|1187800|  1485.93|      GOOGLE|
|2020-10-09| 1516.52|  1489.45|  1494.7|1515.22|1435300|  1515.22|      GOOGLE|
|2020-10-12| 1593.86|  1532.57|  1543.0|1569.15|2482600|  1569.15|      GOOGLE|
|2020-10-13|  1590.0|   1563.2| 1583.73|1571.68|1601000|  1571.68|      GOOGLE|
|2020-10-14|1587.684|  1550.53| 1578.59|1568.08|1931000|  1568.08|      GOOGLE|
|2020-10-15|1575.105|  1545.03| 1547.15|1559.13|1540000|  1559.13|      GOOGLE|
|2020-10-16| 1581.13|   1563.0| 1565.85|1573.01|1434700|  1573.01|      GOOGLE|
|2020-10-19| 1588.15|   1528.0| 1580.46|1534.61|1607100|  1534.61|      GOOGLE|
|2020-10-20|  1577.5|  1525.67| 1527.05|

2022-06-20 16:04:33,209 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2022-06-20 16:04:33,210 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2022-06-20 16:04:33,423 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2022-06-20 16:04:33,452 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+----------+-------+------+------+-------+------+---------+------------+
|      Date|   High|   Low|  Open|  Close|Volume|Adj Close|company_name|
+----------+-------+------+------+-------+------+---------+------------+
|2020-10-07| 210.11|206.72|207.06| 209.83|  null|209.28209|   MICROSOFT|
|2020-10-08| 211.19|208.32|210.51| 210.58|  null|210.03014|   MICROSOFT|
|2020-10-09| 215.86|211.23|211.23| 215.81|  null|215.24648|   MICROSOFT|
|2020-10-12| 223.86|216.81|218.79|  221.4|  null|220.82187|   MICROSOFT|
|2020-10-13| 225.21|220.43|222.72| 222.86|  null|222.27806|   MICROSOFT|
|2020-10-14| 224.22|219.13| 223.0| 220.86|  null|220.28328|   MICROSOFT|
|2020-10-15| 220.36|216.01| 217.1| 219.66|  null|219.08643|   MICROSOFT|
|2020-10-16| 222.29|219.32|220.15| 219.66|  null|219.08643|   MICROSOFT|
|2020-10-19|  222.3|213.72|220.42| 214.22|  null|213.66063|   MICROSOFT|
|2020-10-20| 217.37|213.09| 215.8| 214.65|  null| 214.0895|   MICROSOFT|
|2020-10-21| 216.92|213.12|213.12|  214.8|  null|21

2022-06-20 16:04:35,434 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2022-06-20 16:04:35,435 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2022-06-20 16:04:35,629 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2022-06-20 16:04:35,654 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+----------+------+------+------+------+------+---------+------------+
|      Date|  High|   Low|  Open| Close|Volume|Adj Close|company_name|
+----------+------+------+------+------+------+---------+------------+
|2020-10-07| 429.9|413.85|419.87| 425.3|  null|    425.3|       TESLA|
|2020-10-08| 439.0| 425.3|438.44|425.92|  null|   425.92|       TESLA|
|2020-10-09|434.59|426.46|430.13| 434.0|  null|    434.0|       TESLA|
|2020-10-12|448.74|438.58| 442.0| 442.3|  null|    442.3|       TESLA|
|2020-10-13|448.89| 436.6|443.35|446.65|  null|   446.65|       TESLA|
|2020-10-14| 465.9|447.35|449.78| 461.3|  null|    461.3|       TESLA|
|2020-10-15|456.57| 442.5|450.31|448.88|  null|   448.88|       TESLA|
|2020-10-16|455.95|438.85|454.44|439.67|  null|   439.67|       TESLA|
|2020-10-19| 447.0|428.87|446.24|430.83|  null|   430.83|       TESLA|
|2020-10-20|431.75|419.05|431.75|421.94|  null|   421.94|       TESLA|
|2020-10-21|432.95|421.25| 422.7|422.64|  null|   422.64|       TESLA|
|2020-

2022-06-20 16:04:37,603 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2022-06-20 16:04:37,604 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2022-06-20 16:04:37,749 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2022-06-20 16:04:37,772 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+----------+-------+-------+-------+--------+--------+---------+------------+
|      Date|   High|    Low|   Open|   Close|  Volume|Adj Close|company_name|
+----------+-------+-------+-------+--------+--------+---------+------------+
|2020-10-07|  487.5| 477.11|  482.5|  480.61| 4449500|   480.61|        ZOOM|
|2020-10-08| 489.49| 475.58|  484.0|  478.55| 4193100|   478.55|        ZOOM|
|2020-10-09|  498.9| 484.08| 487.99|  492.41| 8719700|   492.41|        ZOOM|
|2020-10-12| 506.11|490.478| 501.43|  491.54| 6979500|   491.54|        ZOOM|
|2020-10-13|  520.0|  493.0|  494.0|  518.79| 8941500|   518.79|        ZOOM|
|2020-10-14|  519.8|  498.6| 518.78|  509.25| 8327700|   509.25|        ZOOM|
|2020-10-15| 540.05|  506.5| 509.08|   536.4|16520600|    536.4|        ZOOM|
|2020-10-16| 565.45| 543.25|  544.0|   559.0|13745800|    559.0|        ZOOM|
|2020-10-19| 588.84| 562.55|  572.5|  568.34|15193200|   568.34|        ZOOM|
|2020-10-20| 575.69| 534.59| 572.33|  537.02|12098200|   537.02|

                                                                                

+----------+------+------+------+------+-------+---------+------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+------------------+--------------------+-----------------------+------------------------+-----------------------+-------------------+--------------+-------------+
|      Date|  High|   Low|  Open| Close| Volume|Adj Close|company_name|  Weekly Avg Open| Monthly Avg Open|  Yearly Avg Open| Weekly Avg Close|Monthly Avg Close| Yearly Avg Close| Daily Difference|Monthly Difference|        Daily Return|Weekly Avg Daily Return|Monthly Avg Daily Return|Yearly Avg Daily Return|      Weekly Return|Monthly Return|Yearly Return|
+----------+------+------+------+------+-------+---------+------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+------------------+--------------------+-----------------------+-------------------

+----------+-------+-------+-------+-------+------+---------+------------+------------------+------------------+------------------+------------------+------------------+------------------+--------------------+------------------+--------------------+-----------------------+------------------------+-----------------------+--------------------+--------------+-------------+
|      Date|   High|    Low|   Open|  Close|Volume|Adj Close|company_name|   Weekly Avg Open|  Monthly Avg Open|   Yearly Avg Open|  Weekly Avg Close| Monthly Avg Close|  Yearly Avg Close|    Daily Difference|Monthly Difference|        Daily Return|Weekly Avg Daily Return|Monthly Avg Daily Return|Yearly Avg Daily Return|       Weekly Return|Monthly Return|Yearly Return|
+----------+-------+-------+-------+-------+------+---------+------------+------------------+------------------+------------------+------------------+------------------+------------------+--------------------+------------------+--------------------+-----

+----------+------+------+------+------+--------+---------+------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+------------------+--------------------+-----------------------+------------------------+-----------------------+-------------------+--------------+-------------+
|      Date|  High|   Low|  Open| Close|  Volume|Adj Close|company_name|   Weekly Avg Open|  Monthly Avg Open|   Yearly Avg Open|  Weekly Avg Close| Monthly Avg Close|  Yearly Avg Close|   Daily Difference|Monthly Difference|        Daily Return|Weekly Avg Daily Return|Monthly Avg Daily Return|Yearly Avg Daily Return|      Weekly Return|Monthly Return|Yearly Return|
+----------+------+------+------+------+--------+---------+------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+------------------+--------------------+----------------

+----------+-------+-------+------+-------+-------+---------+------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+------------------+--------------------+-----------------------+------------------------+-----------------------+--------------------+--------------+-------------+
|      Date|   High|    Low|  Open|  Close| Volume|Adj Close|company_name|  Weekly Avg Open| Monthly Avg Open|  Yearly Avg Open| Weekly Avg Close|Monthly Avg Close| Yearly Avg Close| Daily Difference|Monthly Difference|        Daily Return|Weekly Avg Daily Return|Monthly Avg Daily Return|Yearly Avg Daily Return|       Weekly Return|Monthly Return|Yearly Return|
+----------+-------+-------+------+-------+-------+---------+------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+------------------+--------------------+-----------------------+--------

+----------+-----+-----+-----+-----+------+---------+------------+------------------+------------------+------------------+------------------+------------------+------------------+--------------------+------------------+--------------------+-----------------------+------------------------+-----------------------+--------------------+--------------+-------------+
|      Date| High|  Low| Open|Close|Volume|Adj Close|company_name|   Weekly Avg Open|  Monthly Avg Open|   Yearly Avg Open|  Weekly Avg Close| Monthly Avg Close|  Yearly Avg Close|    Daily Difference|Monthly Difference|        Daily Return|Weekly Avg Daily Return|Monthly Avg Daily Return|Yearly Avg Daily Return|       Weekly Return|Monthly Return|Yearly Return|
+----------+-----+-----+-----+-----+------+---------+------------+------------------+------------------+------------------+------------------+------------------+------------------+--------------------+------------------+--------------------+-----------------------+-----

+----------+------+------+------+------+------+---------+------------+------------------+------------------+------------------+------------------+------------------+------------------+--------------------+------------------+-------------------+-----------------------+------------------------+-----------------------+-------------------+--------------+-------------+
|      Date|  High|   Low|  Open| Close|Volume|Adj Close|company_name|   Weekly Avg Open|  Monthly Avg Open|   Yearly Avg Open|  Weekly Avg Close| Monthly Avg Close|  Yearly Avg Close|    Daily Difference|Monthly Difference|       Daily Return|Weekly Avg Daily Return|Monthly Avg Daily Return|Yearly Avg Daily Return|      Weekly Return|Monthly Return|Yearly Return|
+----------+------+------+------+------+------+---------+------------+------------------+------------------+------------------+------------------+------------------+------------------+--------------------+------------------+-------------------+----------------------

+----------+------+------+-----+-----+--------+---------+------------+-----------------+-----------------+-----------------+------------------+------------------+------------------+-------------------+------------------+-------------------+-----------------------+------------------------+-----------------------+--------------------+--------------+-------------+
|      Date|  High|   Low| Open|Close|  Volume|Adj Close|company_name|  Weekly Avg Open| Monthly Avg Open|  Yearly Avg Open|  Weekly Avg Close| Monthly Avg Close|  Yearly Avg Close|   Daily Difference|Monthly Difference|       Daily Return|Weekly Avg Daily Return|Monthly Avg Daily Return|Yearly Avg Daily Return|       Weekly Return|Monthly Return|Yearly Return|
+----------+------+------+-----+-----+--------+---------+------------+-----------------+-----------------+-----------------+------------------+------------------+------------------+-------------------+------------------+-------------------+-----------------------+--------

Correlation between AMAZON and APPLE
[['High', 'Low', 'Open', 'Close', 'Adj Close'], [0.9297874259390847, 0.9294547100635051, 0.929432373427345, 0.9292860084204524, 0.9314678774869076]]


[Stage 257:>                                                        (0 + 1) / 1]                                                                                

Correlation between AMAZON and FACEBOOK
[['High', 'Low', 'Open', 'Close', 'Adj Close'], [0.8682214375390286, 0.858656263465311, 0.8630665010422292, 0.8628112282541691, 0.8628112282541691]]
Correlation between AMAZON and GOOGLE
[['High', 'Low', 'Open', 'Close', 'Adj Close'], [0.9279218467592136, 0.9238630174063731, 0.9262870255493186, 0.9255061932293708, 0.9255061932293708]]


[Stage 303:>                                                        (0 + 1) / 1]                                                                                

Correlation between AMAZON and MICROSOFT
[['High', 'Low', 'Open', 'Close', 'Adj Close'], [0.951284096151614, 0.9524913662368054, 0.9514272822500527, 0.9519391984487741, 0.9511824060153518]]


[Stage 326:>                                                        (0 + 1) / 1]                                                                                

Correlation between AMAZON and TESLA
[['High', 'Low', 'Open', 'Close', 'Adj Close'], [0.8123427553679604, 0.8069472186412819, 0.8100728829691062, 0.8094073996359907, 0.8094073996359907]]


[Stage 349:>                                                        (0 + 1) / 1]                                                                                

Correlation between AMAZON and ZOOM
[['High', 'Low', 'Open', 'Close', 'Adj Close'], [0.9133082712469004, 0.9140200107429356, 0.9144061918476822, 0.914744271428987, 0.914744271428987]]


[Stage 372:>                                                        (0 + 1) / 1]                                                                                

Correlation between APPLE and FACEBOOK
[['High', 'Low', 'Open', 'Close', 'Adj Close'], [0.912895378181455, 0.9061836311978357, 0.9093706647109683, 0.909143831170847, 0.9086533367686319]]
Correlation between APPLE and GOOGLE
[['High', 'Low', 'Open', 'Close', 'Adj Close'], [0.9250570139204963, 0.9229422544755259, 0.9232986324073348, 0.9238202964706318, 0.9246956687641503]]
Correlation between APPLE and MICROSOFT
[['High', 'Low', 'Open', 'Close', 'Adj Close'], [0.9412485224711806, 0.9417320000174492, 0.940544899449, 0.9414663222485514, 0.9439920309680077]]


[Stage 441:>                                                        (0 + 1) / 1]                                                                                

Correlation between APPLE and TESLA
[['High', 'Low', 'Open', 'Close', 'Adj Close'], [0.9113585843845249, 0.9083134679329079, 0.9101124008032263, 0.9094872749905819, 0.908939847211644]]


[Stage 464:>                                                        (0 + 1) / 1]                                                                                

Correlation between APPLE and ZOOM
[['High', 'Low', 'Open', 'Close', 'Adj Close'], [0.8808886369902045, 0.8784167261492821, 0.8794186794999088, 0.8798818350347076, 0.8822159457571348]]


                                                                                

Correlation between FACEBOOK and GOOGLE
[['High', 'Low', 'Open', 'Close', 'Adj Close'], [0.9013204763144368, 0.8985141699549771, 0.8993882249308792, 0.9000839548151618, 0.9000839548151618]]


[Stage 510:>                                                        (0 + 1) / 1]                                                                                

Correlation between FACEBOOK and MICROSOFT
[['High', 'Low', 'Open', 'Close', 'Adj Close'], [0.8611344337622792, 0.853161152499042, 0.8567486831660741, 0.8570760230066189, 0.8556526821290147]]
Correlation between FACEBOOK and TESLA
[['High', 'Low', 'Open', 'Close', 'Adj Close'], [0.8331028488431108, 0.8246657984737641, 0.8286301094216965, 0.8288057479037657, 0.8288057479037657]]
Correlation between FACEBOOK and ZOOM
[['High', 'Low', 'Open', 'Close', 'Adj Close'], [0.8549963510605557, 0.8442544993637039, 0.851767510593323, 0.8500061058765412, 0.8500061058765412]]


[Stage 579:>                                                        (0 + 1) / 1]                                                                                

Correlation between GOOGLE and MICROSOFT
[['High', 'Low', 'Open', 'Close', 'Adj Close'], [0.9453856481181697, 0.942653299169456, 0.9442185369519542, 0.9442882610519647, 0.9433944144632486]]


[Stage 602:>                                                        (0 + 1) / 1]                                                                                

Correlation between GOOGLE and TESLA
[['High', 'Low', 'Open', 'Close', 'Adj Close'], [0.7712390089726987, 0.7631472893662992, 0.7666815667279554, 0.7665494283405445, 0.7665494283405445]]


                                                                                

Correlation between GOOGLE and ZOOM
[['High', 'Low', 'Open', 'Close', 'Adj Close'], [0.7498826496251751, 0.7311612333480673, 0.7419145392050145, 0.7405503876943942, 0.7405503876943942]]


[Stage 648:>                                                        (0 + 1) / 1]                                                                                

Correlation between MICROSOFT and TESLA
[['High', 'Low', 'Open', 'Close', 'Adj Close'], [0.7743821540247339, 0.7703609856919504, 0.7719231288541617, 0.7720378958676984, 0.7718157786620587]]


[Stage 671:>                                                        (0 + 1) / 1]                                                                                

Correlation between MICROSOFT and ZOOM
[['High', 'Low', 'Open', 'Close', 'Adj Close'], [0.842943435742423, 0.8431835101596402, 0.8420396419988482, 0.8431651966720145, 0.8451039311151026]]
Correlation between TESLA and ZOOM
[['High', 'Low', 'Open', 'Close', 'Adj Close'], [0.9406250294875865, 0.9448065212915425, 0.9426038471310438, 0.9431541228015273, 0.9431541228015273]]


[Stage 717:>                                                        (0 + 1) / 1]

Best return rate is 773.114967438317 for the stock of TESLA


                                                                                

#### We can cite and explain several insights that could be helpful to explore the data further :

- MACD : calculated using the difference between a short term EMA (12-period) and a long term EMA (26 period), period being a time interval and EMA being the exponential moving average, a type of moving average where more weight is given to more recent data points. Its value can signify a change in momentum of the stock price.
- Aroon oscillator : value computed by calculating the difference between aroon up and aroon down, it describes the general tendency of the stock price evolution, whether it is going up or down
- Portfolio diversification : this process implies observing and detecting stocks which are not related, as to invest in these to minimize the risk if one sector where to crash
- Bollinger Band : a statistical chart copyrighted by John Bollinger that produces bounds on the stock price evolution in order to represent volatility, closer bounds meaning low volatility.
- Value at risk : calculation that gives us the probability of a big loss or gain in the stock
- Portfolio optimization : set of techniques that allow us to select an optimal portfolio for maximizing expected return and minimizing risk 
- Using Facebook's Prophet Model : a model developed by facebook that can predict trend in stock price evolution using data
- ARIMA (Autoregressive integrated moving average) : model using moving average of variables in stock prices in order to perform regression and predict future evolution

however, implementing all these insights is reasonably not in the scope of this project, furthermore some of them require additional data.

## Second Step : ML

On this part we are going to use regression models to predict the Avg Close of some day, which is basically a measure of the stock value at a specific time point.

#### 1) Data Preparation
For now we will focus on a model that can predict the Adj Close of a day given data of the previous day. Our model will thus be able to give short-term predictions.
In other terms, we want to be able to predict the Avg Close for the next day given a specific day, to do this we will add the next day's Adj Close for any row as a new column.

In [14]:
df_ml = df
w = Window.partitionBy(F.col("company_name")).orderBy(F.col("Date").cast("timestamp").cast("long")).rangeBetween(days(1), days(1))
df_ml = df_ml.withColumn("Adj Close +1",F.avg("Adj Close").over(w))
df_ml.show()

+----------+------+------+------+------+-------+---------+------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+------------------+--------------------+-----------------------+------------------------+-----------------------+-------------------+--------------+-------------+-----------------+
|      Date|  High|   Low|  Open| Close| Volume|Adj Close|company_name|  Weekly Avg Open| Monthly Avg Open|  Yearly Avg Open| Weekly Avg Close|Monthly Avg Close| Yearly Avg Close| Daily Difference|Monthly Difference|        Daily Return|Weekly Avg Daily Return|Monthly Avg Daily Return|Yearly Avg Daily Return|      Weekly Return|Monthly Return|Yearly Return|     Adj Close +1|
+----------+------+------+------+------+-------+---------+------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+------------------+--------------------+-------

Of course we do not always have information on the next day for every day on our database. We have to remove rows where this information is not disponible as it is the one we want to predict. Sadly this represents 1393/6333 = 21 % of our data, we will keep this in mind.

In [15]:
df_ml.select(F.count(F.when(F.isnull("Adj Close +1"), "Adj Close +1")).alias("Adj Close +1 -- nb nulls")).show()
print("DF size: " + str(df_ml.count()))
df_ml = df_ml.filter(df_ml["Adj Close +1"].isNotNull())
print("DF size without Adj Close +1 nulls: " + str(df_ml.count()))

+------------------------+
|Adj Close +1 -- nb nulls|
+------------------------+
|                    1393|
+------------------------+

DF size: 6333
DF size without Adj Close +1 nulls: 4940


#### 2) Linear Regression
We will train our model on a 80/20 train/test split of our data. For now the explicative variables will be 'Open', 'Close', 'Weekly Avg Daily Return', 'Monthly Avg Daily Return' and 'Yearly Avg Daily Return'. We will construct a vector column for our features using these variables.

In [16]:
from pyspark.ml.feature import VectorAssembler

trainDF, testDF = df_ml.randomSplit([.8, .2], seed=42)

vecAssembler = VectorAssembler(inputCols=['Open', 'Close', 'Weekly Avg Close', 'Monthly Avg Close', 'Yearly Avg Close'], outputCol="features")

We will train using basic Linear Regression for now, it will be our baseline for the rest of this part.

In [17]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol="features", labelCol="Adj Close +1")

We will now perform the training and see the predictions for our test data using a pipeline

In [18]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[vecAssembler, lr])
pipelineModel = pipeline.fit(trainDF)

predDF = pipelineModel.transform(testDF)
predDF = predDF.select("Adj Close +1", "prediction")
predDF.show(10)

2022-06-20 16:06:09,921 WARN util.Instrumentation: [841fcd62] regParam is zero, which might cause numerical instability and overfitting.
[Stage 752:>                                                        (0 + 1) / 1]

+------------------+------------------+
|      Adj Close +1|        prediction|
+------------------+------------------+
| 58.41072463989258|61.889395808458595|
| 27.38566780090332|28.265385569821767|
| 58.41072463989258|61.632150170293244|
|45.801998138427734|  44.5497628408573|
|  45.9739990234375| 45.43250503086868|
| 804.7899780273438| 805.9870449914793|
| 807.9099731445312| 804.6960582011217|
| 806.3599853515625| 808.4673088998499|
|127.91999816894531|127.23037615794989|
| 806.0700073242188| 805.6403440992096|
+------------------+------------------+
only showing top 10 rows



                                                                                

We will use RegressionMetrics to obtain the mean square error and the R²

In [19]:
from pyspark.mllib.evaluation import RegressionMetrics
import numpy as np

val_pred = predDF.rdd.map(tuple)
rm = RegressionMetrics(val_pred)

print("Average of the Adj Close: " + str(df_ml.select(F.avg('Adj Close')).collect()[0][0]))
print("Root mean squared error: " + str(np.sqrt(rm.meanSquaredError)))
print("R Squared: " + str(rm.r2))

                                                                                

Average of the Adj Close: 537.4425730492904


[Stage 765:>                                                        (0 + 1) / 1]

Root mean squared error: 17.57522467449153
R Squared: 0.999352926889026


                                                                                

We have an R² of 0.99 and mean square error of 18. Which is already good given that the average for Adj Close is 537.

#### 3) Using imputation to add another variable
We have information on the previous week, month and year but the best would be to take into account the information on the stock's value at the previous day. We can achieve that by adding 'Daily Difference' in our explicative variables, but we do not have it on every data point.

In [20]:
df_ml.select(F.count(F.when(F.isnull("Daily Difference"), "Daily Difference")).alias("Daily Difference -- nb nulls")).show()
print("DF size: " + str(df_ml.count()))

+----------------------------+
|Daily Difference -- nb nulls|
+----------------------------+
|                        1341|
+----------------------------+

DF size: 4940


Sadly this represents 1341/4940=27% of our data. Furthermore, we already removed 21% of our data by removing data points where information about the next data was not available. Thus we cannot afford to yet again remove these rows. Instead we will impute these values. We make a decision, that we will consider the stock value for the previous day to be the same as the current day if we do not have it. Consequently we will replace null values on the daily difference column by 0. We will remember where we inferred this variable by adding another column 'Daily Return Imputed'

In [21]:
df_ml = df_ml.withColumn("Daily Difference Imputated", df_ml["Daily Difference"].isNull())
df_ml = df_ml.fillna(value=0,subset=["Daily Difference"])
df_ml.select(F.count(F.when(F.isnull("Daily Difference"), "Daily Difference")).alias("Daily Difference -- nb nulls")).show()
df_ml.select(F.count(F.when(df_ml["Daily Difference Imputated"], "Daily Difference Imputated")).alias("Daily Difference Imputated -- nb True")).show()

+----------------------------+
|Daily Difference -- nb nulls|
+----------------------------+
|                           0|
+----------------------------+

+-------------------------------------+
|Daily Difference Imputated -- nb True|
+-------------------------------------+
|                                 1341|
+-------------------------------------+



Let's train again and see our new performances.

In [22]:
trainDF, testDF = df_ml.randomSplit([.8, .2], seed=42)
vecAssembler = VectorAssembler(inputCols=['Open', 'Close', 'Weekly Avg Close', 'Monthly Avg Close', 'Yearly Avg Close', 'Daily Difference'], outputCol="features")
pipeline = Pipeline(stages=[vecAssembler, lr])

pipelineModel = pipeline.fit(trainDF)
predDF = pipelineModel.transform(testDF)

predDF = predDF.select("Adj Close +1", "prediction")
predDF.show(10)

val_pred = predDF.rdd.map(tuple)
rm = RegressionMetrics(val_pred)

print("Average of the Adj Close: " + str(df_ml.select(F.avg('Adj Close')).collect()[0][0]))
print("Root mean squared error: " + str(np.sqrt(rm.meanSquaredError)))
print("R Squared: " + str(rm.r2))

2022-06-20 16:06:19,646 WARN util.Instrumentation: [f04582f3] regParam is zero, which might cause numerical instability and overfitting.
                                                                                

+------------------+------------------+
|      Adj Close +1|        prediction|
+------------------+------------------+
| 58.41072463989258| 61.85819058414851|
| 27.38566780090332| 28.24943032114706|
| 58.41072463989258|61.627572419273314|
|45.801998138427734| 44.55279439215404|
|  45.9739990234375|  45.4612891767964|
| 804.7899780273438| 806.2656569320546|
| 807.9099731445312|  804.850963390572|
| 806.3599853515625| 808.3856677284217|
|127.91999816894531|127.20504307170474|
| 806.0700073242188| 805.4507474597208|
+------------------+------------------+
only showing top 10 rows



                                                                                

Average of the Adj Close: 537.4425730492904


[Stage 820:>                                                        (0 + 1) / 1]

Root mean squared error: 17.493689407537598
R Squared: 0.9993592109632434


                                                                                

Our performance improved, but not by much as the daily difference may be correlated to other variables.

#### 4) Testing Ridge Regression
We will now use ridge regression to try giving better predictions. To do this we must find a good regularization parameter. We will try hyperparameter tuning to find the best regularization parameter, to do that we will use a cross validator with 10 folds, and various values to test. Note that we will perform cross validation outside of our pipeline to be able to see the best value for the parameter.

In [23]:
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator

lr = LinearRegression(featuresCol="features", labelCol="Adj Close +1")

paramGrid = (ParamGridBuilder()
            .addGrid(lr.regParam, [0, 0.001, 0.01, 0.1])
            .build())


evaluator = RegressionEvaluator(labelCol="Adj Close +1", 
                                predictionCol="prediction", 
                                metricName="rmse")

cv = CrossValidator(estimator=lr, 
                    evaluator=evaluator, 
                    estimatorParamMaps=paramGrid, 
                    numFolds=10,
                    seed=42)

pipeline = Pipeline(stages=[vecAssembler])
 
pipelineModel = pipeline.fit(trainDF)
vecTrainDF = pipelineModel.transform(trainDF)
cvModel = cv.fit(vecTrainDF)
list(zip(cvModel.getEstimatorParamMaps(), cvModel.avgMetrics))

2022-06-20 16:06:25,454 WARN util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
2022-06-20 16:06:25,975 WARN util.Instrumentation: [9f495261] regParam is zero, which might cause numerical instability and overfitting.
2022-06-20 16:06:30,062 WARN util.Instrumentation: [d09de904] regParam is zero, which might cause numerical instability and overfitting.
2022-06-20 16:06:34,288 WARN util.Instrumentation: [e56e7629] regParam is zero, which might cause numerical instability and overfitting.
2022-06-20 16:06:37,963 WARN util.Instrumentation: [51e825e0] regParam is zero, which might cause numerical instability and overfitting.
2022-06-20 16:06:41,877 WARN util.Instrumentation: [193c45b7] regParam is zero, which might cause numerical instability and overfitting.
2022-06-20 16:06:45,648 WARN util.Instrumentation: [ccac4961] regParam is zero, which might cause numerical instability and 

[({Param(parent='LinearRegression_2ae0fc13e751', name='regParam', doc='regularization parameter (>= 0).'): 0.0},
  17.430783430350157),
 ({Param(parent='LinearRegression_2ae0fc13e751', name='regParam', doc='regularization parameter (>= 0).'): 0.001},
  17.42916787104426),
 ({Param(parent='LinearRegression_2ae0fc13e751', name='regParam', doc='regularization parameter (>= 0).'): 0.01},
  17.429613596434958),
 ({Param(parent='LinearRegression_2ae0fc13e751', name='regParam', doc='regularization parameter (>= 0).'): 0.1},
  17.690499696042544)]

The best order of magnitude for the regularization parameter seems to be around 0.001, let's see our new performance using this parameter.

In [24]:
vecTestDF = pipelineModel.transform(testDF)

predDF = cvModel.transform(vecTestDF)
predDF = predDF.select("Adj Close +1", "prediction")
predDF.show(10)

val_pred = predDF.rdd.map(tuple)
rm = RegressionMetrics(val_pred)

print("Average of the Adj Close: " + str(df_ml.select(F.avg('Adj Close')).collect()[0][0]))
print("Root mean squared error: " + str(np.sqrt(rm.meanSquaredError)))
print("R Squared: " + str(rm.r2))

+------------------+------------------+
|      Adj Close +1|        prediction|
+------------------+------------------+
| 58.41072463989258|61.860951586165626|
| 27.38566780090332|28.250288458151836|
| 58.41072463989258| 61.62900207983631|
|45.801998138427734| 44.55266886274174|
|  45.9739990234375|45.458456865342384|
| 804.7899780273438| 806.2561288449463|
| 807.9099731445312| 804.8601400786293|
| 806.3599853515625| 808.3845440695644|
|127.91999816894531|127.20743913857123|
| 806.0700073242188| 805.4752367578036|
+------------------+------------------+
only showing top 10 rows

Average of the Adj Close: 537.4425730492904


[Stage 1227:>                                                       (0 + 1) / 1]

Root mean squared error: 17.493173538375615
R Squared: 0.9993592312694387


                                                                                

The performance is almost the same ! This may be explained by the face that we have 4940 data points and only 6 explicative variables, meaning we are in a situation where n >> p. In short our data may be too large for Ridge regression to be worth it. Thus we will keep unbiased Linear Regression.

#### 5) Using OHE to add a categorical variable

We will now try to improve our model by adding the information of which company's stock we are trying to predict in the explicative variables. To do this we will perform OHE (one hot encoding) using StringIndexer, OneHotEncore and VectorAssembler.

In [25]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

stringIndexer = StringIndexer(inputCols=["company_name"], 
                              outputCols=["company_name_index"])
oheEncoder = OneHotEncoder(inputCols=["company_name_index"], 
                           outputCols=["company_name_OHE"])

vecAssembler = VectorAssembler(inputCols=['Open', 'Close', 'Weekly Avg Close', 'Monthly Avg Close', 'Yearly Avg Close', 'Daily Difference', 'company_name_OHE'], outputCol="features")

Lets see what does our OHE looks like.

In [26]:
pipeline = Pipeline(stages=[stringIndexer, oheEncoder])

pipelineModel = pipeline.fit(trainDF)
pipelineModel.transform(testDF).select(['company_name', 'company_name_index', 'company_name_OHE']).show(10)

[Stage 1238:>                                                       (0 + 1) / 1]

+------------+------------------+----------------+
|company_name|company_name_index|company_name_OHE|
+------------+------------------+----------------+
|   MICROSOFT|               5.0|   (6,[5],[1.0])|
|       APPLE|               4.0|   (6,[4],[1.0])|
|   MICROSOFT|               5.0|   (6,[5],[1.0])|
|       TESLA|               2.0|   (6,[2],[1.0])|
|       TESLA|               2.0|   (6,[2],[1.0])|
|      GOOGLE|               1.0|   (6,[1],[1.0])|
|      GOOGLE|               1.0|   (6,[1],[1.0])|
|      GOOGLE|               1.0|   (6,[1],[1.0])|
|    FACEBOOK|               3.0|   (6,[3],[1.0])|
|      GOOGLE|               1.0|   (6,[1],[1.0])|
+------------+------------------+----------------+
only showing top 10 rows



                                                                                

Let's train again and see our new performances.

In [27]:
pipeline = Pipeline(stages=[stringIndexer, oheEncoder, vecAssembler, lr])

pipelineModel = pipeline.fit(trainDF)
predDF = pipelineModel.transform(testDF)

predDF = predDF.select("Adj Close +1", "prediction")
predDF.show(10)

val_pred = predDF.rdd.map(tuple)
rm = RegressionMetrics(val_pred)

print("Average of the Adj Close: " + str(df_ml.select(F.avg('Adj Close')).collect()[0][0]))
print("Root mean squared error: " + str(np.sqrt(rm.meanSquaredError)))
print("R Squared: " + str(rm.r2))

2022-06-20 16:07:14,245 WARN util.Instrumentation: [d0f469e8] regParam is zero, which might cause numerical instability and overfitting.
                                                                                

+------------------+------------------+
|      Adj Close +1|        prediction|
+------------------+------------------+
| 58.41072463989258|  60.0736184549013|
| 27.38566780090332| 27.66626227757183|
| 58.41072463989258| 59.84334098776507|
|45.801998138427734| 45.70471737425869|
|  45.9739990234375| 46.61257534499252|
| 804.7899780273438| 806.0188398098536|
| 807.9099731445312|  804.607728541524|
| 806.3599853515625| 808.1458341788539|
|127.91999816894531|127.96766094152268|
| 806.0700073242188|  805.214516168016|
+------------------+------------------+
only showing top 10 rows



[Stage 1263:>                                                       (0 + 1) / 1]                                                                                

Average of the Adj Close: 537.4425730492904


[Stage 1272:>                                                       (0 + 1) / 1]

Root mean squared error: 17.47096818762458
R Squared: 0.999360896513598


                                                                                

Our performance improved but only slightly, maybe linear regression is not the best to exploit this information.

#### 6) Random Forest Regression

We will try using RandomForest to perform our predictions now. Note that we must not use OHE on our categorical variables, we will only use the StringIndexer.

In [28]:
from pyspark.ml.regression import RandomForestRegressor

vecAssembler = VectorAssembler(inputCols=['Open', 'Close', 'Weekly Avg Close', 'Monthly Avg Close', 'Yearly Avg Close', 'Daily Difference', 'company_name_index'], outputCol="features")
rf = RandomForestRegressor(featuresCol="features", labelCol="Adj Close +1")

pipeline = Pipeline(stages=[stringIndexer, vecAssembler, rf])

pipelineModel = pipeline.fit(trainDF)
predDF = pipelineModel.transform(testDF)

predDF = predDF.select("Adj Close +1", "prediction")
predDF.show(10)

val_pred = predDF.rdd.map(tuple)
rm = RegressionMetrics(val_pred)

print("Average of the Adj Close: " + str(df_ml.select(F.avg('Adj Close')).collect()[0][0]))
print("Root mean squared error: " + str(np.sqrt(rm.meanSquaredError)))
print("R Squared: " + str(rm.r2))

                                                                                

+------------------+------------------+
|      Adj Close +1|        prediction|
+------------------+------------------+
| 58.41072463989258| 64.69487505881236|
| 27.38566780090332| 38.93195732114348|
| 58.41072463989258| 64.69487505881236|
|45.801998138427734| 47.12932351999062|
|  45.9739990234375| 47.12932351999062|
| 804.7899780273438| 856.6682621257793|
| 807.9099731445312| 856.6682621257793|
| 806.3599853515625| 861.6196840627896|
|127.91999816894531|133.84576792797026|
| 806.0700073242188| 861.6196840627896|
+------------------+------------------+
only showing top 10 rows



[Stage 1311:>                                                       (0 + 1) / 1]                                                                                

Average of the Adj Close: 537.4425730492904


[Stage 1320:>                                                       (0 + 1) / 1]

Root mean squared error: 65.4158486549389
R Squared: 0.9910028557993061


                                                                                

Our performance is way worse than using Linear Regression, however we used the default hyperparameters when constructing our random forest regressor.

#### 7) Hyperparameter tuning for random forest regression

As like with ridge regression, we will try hyperparameter tuning to find the best hyperparameters, to do that we will use a cross validator with 10 folds, and various values to test for maxDepth and numTrees. We will once again perform cross validation outside of our pipeline to be able to see the best parameters.

In [29]:
paramGrid = (ParamGridBuilder()
            .addGrid(rf.maxDepth, [2, 4, 6])
            .addGrid(rf.numTrees, [10, 50, 100])
            .build())


evaluator = RegressionEvaluator(labelCol="Adj Close +1", 
                                predictionCol="prediction", 
                                metricName="rmse")

cv = CrossValidator(estimator=rf, 
                    evaluator=evaluator, 
                    estimatorParamMaps=paramGrid, 
                    numFolds=10,
                    seed=42)

pipeline = Pipeline(stages=[stringIndexer, vecAssembler])
 
pipelineModel = pipeline.fit(trainDF)
vecTrainDF = pipelineModel.transform(trainDF)
cvModel = cv.fit(vecTrainDF)
list(zip(cvModel.getEstimatorParamMaps(), cvModel.avgMetrics))

2022-06-20 16:07:40,135 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 1137.5 KiB
2022-06-20 16:07:49,156 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 1155.2 KiB
2022-06-20 16:07:58,189 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 1138.9 KiB
2022-06-20 16:08:06,145 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 1124.5 KiB
2022-06-20 16:08:15,354 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 1147.6 KiB
2022-06-20 16:08:23,200 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 1155.1 KiB
2022-06-20 16:08:31,232 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 1139.0 KiB
2022-06-20 16:08:39,674 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 1135.0 KiB
2022-06-20 16:08:47,120 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 1159.9 KiB
2022-06-20 16:08:54,443 WARN scheduler.DAGScheduler: Br

[({Param(parent='RandomForestRegressor_86e08c6fdce0', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 2,
   Param(parent='RandomForestRegressor_86e08c6fdce0', name='numTrees', doc='Number of trees to train (>= 1).'): 10},
  157.8419198780815),
 ({Param(parent='RandomForestRegressor_86e08c6fdce0', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 2,
   Param(parent='RandomForestRegressor_86e08c6fdce0', name='numTrees', doc='Number of trees to train (>= 1).'): 50},
  161.2806915666573),
 ({Param(parent='RandomForestRegressor_86e08c6fdce0', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 2,
   Param(parent='RandomForestRegressor_86e08c6fdce0', name=

According to our 10-fold cross validation the best parameters are : maxDepth=6, numTrees=100

In [30]:
predDF = cvModel.transform(vecTrainDF)
predDF = predDF.select("Adj Close +1", "prediction")
predDF.show(10)

val_pred = predDF.rdd.map(tuple)
rm = RegressionMetrics(val_pred)

print("Average of the Adj Close: " + str(df_ml.select(F.avg('Adj Close')).collect()[0][0]))
print("Root mean squared error: " + str(np.sqrt(rm.meanSquaredError)))
print("R Squared: " + str(rm.r2))

                                                                                

+------------------+------------------+
|      Adj Close +1|        prediction|
+------------------+------------------+
|27.247108459472656|34.958556207102454|
| 45.39799880981445|40.385869083322675|
|118.69000244140625|113.32618959675287|
| 757.1799926757812| 742.3436743827999|
| 786.9000244140625| 742.3436743827999|
|45.349998474121094| 44.22998106558981|
|120.66999816894531|113.32618959675287|
| 780.4500122070312| 741.5577237602414|
|   794.02001953125| 744.4796744438352|
| 27.69097137451172|34.958556207102454|
+------------------+------------------+
only showing top 10 rows

Average of the Adj Close: 537.4425730492904


[Stage 3371:>                                                       (0 + 1) / 1]

Root mean squared error: 58.83454257711474
R Squared: 0.9928575080354523


                                                                                

The performance is better, but our linear regression model stays the best one. The best parameters found seem to indicate that big a maxDepth is better, so we could try to test with even higher maxDepth but it would be too much resource consuming and take too much time to be worth it.

#### 8) Evaluating performance when training for longer-term predictions
The best model we were able to obtain is a simple linear regression using the following variables : 'Open', 'Close', 'Weekly Avg Daily Return', 'Monthly Avg Daily Return', 'Yearly Avg Daily Return', 'Daily Difference' and 'company_name_index'. We will on this last part try using this structure of model to try and predict a stocks value seven days in the future to see our performance. It will be a good occasion to recapitulate all the steps we have done to obtain our final model structure.

In [31]:
# We construct the Adj Close +7 column containing the Adj Close variable for the data point 7 days in the future
df_ml = df
w = Window.partitionBy(F.col("company_name")).orderBy(F.col("Date").cast("timestamp").cast("long")).rangeBetween(days(7), days(7))
df_ml = df_ml.withColumn("Adj Close +7", F.avg("Adj Close").over(w))



# We must remove rows where this information is not disponible, as it is the one we want to predict.
df_ml.select(F.count(F.when(F.isnull("Adj Close +7"), "Adj Close +7")).alias("Adj Close +7 -- nb nulls")).show()
print("DF size: " + str(df_ml.count()))
df_ml = df_ml.filter(df_ml["Adj Close +7"].isNotNull())
print("DF size without Adj Close +7 nulls: " + str(df_ml.count()))



# We use imputation to fill the daily difference column, by filling missing values by 0.
df_ml = df_ml.withColumn("Daily Difference Imputated", df_ml["Daily Difference"].isNull())
df_ml = df_ml.fillna(value=0,subset=["Daily Difference"])
df_ml.select(F.count(F.when(F.isnull("Daily Difference"), "Daily Difference")).alias("Daily Difference -- nb nulls")).show()
df_ml.select(F.count(F.when(df_ml["Daily Difference Imputated"], "Daily Difference Imputated")).alias("Daily Difference Imputated -- nb True")).show()


#We divide our dataset in an 80/20 train/test split
trainDF, testDF = df_ml.randomSplit([.8, .2], seed=42)


# We use OHE to take into account the company, because it is a categorical variable
stringIndexer = StringIndexer(inputCols=["company_name"], 
                              outputCols=["company_name_index"])
oheEncoder = OneHotEncoder(inputCols=["company_name_index"], 
                           outputCols=["company_name_OHE"])
vecAssembler = VectorAssembler(inputCols=['Open', 'Close', 'Weekly Avg Close', 'Monthly Avg Close', 'Yearly Avg Close', 'Daily Difference', 'company_name_OHE'], outputCol="features")



# We train using linear regression
lr = LinearRegression(featuresCol="features", labelCol="Adj Close +7")
pipeline = Pipeline(stages=[stringIndexer, oheEncoder, vecAssembler, lr])

pipelineModel = pipeline.fit(trainDF)
predDF = pipelineModel.transform(testDF)

predDF = predDF.select("Adj Close +7", "prediction")
predDF.show(10)

val_pred = predDF.rdd.map(tuple)
rm = RegressionMetrics(val_pred)

print("Average of the Adj Close: " + str(df_ml.select(F.avg('Adj Close')).collect()[0][0]))
print("Root mean squared error: " + str(np.sqrt(rm.meanSquaredError)))
print("R Squared: " + str(rm.r2))

+------------------------+
|Adj Close +7 -- nb nulls|
+------------------------+
|                     481|
+------------------------+

DF size: 6333
DF size without Adj Close +7 nulls: 5852
+----------------------------+
|Daily Difference -- nb nulls|
+----------------------------+
|                           0|
+----------------------------+

+-------------------------------------+
|Daily Difference Imputated -- nb True|
+-------------------------------------+
|                                 1220|
+-------------------------------------+



2022-06-20 16:09:04,630 WARN util.Instrumentation: [2f386ca0] regParam is zero, which might cause numerical instability and overfitting.
                                                                                

+------------------+------------------+
|      Adj Close +7|        prediction|
+------------------+------------------+
| 58.71074676513672| 60.65609340449297|
|28.123088836669922|28.151734907684965|
| 59.24515914916992| 60.43082582973664|
|45.917999267578125|48.070453738430395|
| 47.54999923706055| 48.50636279559906|
| 807.8800048828125| 808.8334396135305|
| 804.6099853515625| 807.8406744467624|
| 806.0700073242188| 811.8052166006702|
|127.04000091552734|128.69850197879416|
|   805.02001953125| 812.0958158395672|
+------------------+------------------+
only showing top 10 rows



[Stage 3427:>                                                       (0 + 1) / 1]                                                                                

Average of the Adj Close: 536.033969218814


[Stage 3436:>                                                       (0 + 1) / 1]

Root mean squared error: 38.721356016146416
R Squared: 0.9971324396456984


                                                                                

The RMSE doubled compared to +1 predictions, but considering we are trying to predict way further in the future, our RMSE is still pretty good !

### Conclusion
Basic linear regression is hard to beat for predicting stock value. We were able to improve it by using other variables, but trying to use other methods such as ridge regression or random forest did not help. Another problem was that the dataset was missing a lot of dates, rendering approximatively 20% of our data unusable. (Note that these data points are still partially used, to construct the +1/+7 column on other data points) We must also remember that 27% of our data on the daily difference is inferred and set to 0, but as the model we employed is linear regression, it means this variable is not taken into account in the internal formula when data is unavailable, which is what we want.