Student 1 : Philippe BERNET  23130

Student 2 : Marius   DUBOSC  23527

Student 3 : Baptiste BOURDET

In [None]:
import ipywidgets as widgets
from IPython.display import display
import os
import pandas as pd

from datetime import date, datetime

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import when, count, col, hour, mean, year, window, udf, avg, month, min, max, dayofmonth, weekofyear, lag, isnull

from pyspark.sql.window import Window

import matplotlib.pyplot as plt
import matplotlib
matplotlib.rcParams["figure.figsize"] = (12, 10)
# %matplotlib widget

In [None]:
data_folder = "stocks_data/"
companies = ['AMAZON',
             'APPLE',
             'FACEBOOK',
             'GOOGLE',
             'MICROSOFT',
             'TESLA',
             'ZOOM']


In [None]:
spark_application_name = "Spark_Application_Project"
spark = (SparkSession.builder.appName(spark_application_name).getOrCreate())

# Data loading

In [None]:
dfCol = StructType([StructField("Date",TimestampType()), StructField("High",DoubleType()), StructField("Low",DoubleType()), StructField("Open",DoubleType()), StructField("Close",DoubleType()), StructField("Volume", DoubleType()), StructField("Adj Close", DoubleType()), StructField("company_name",StringType())])
dfSchema = StructType(dfCol)

df = spark.read.csv(data_folder, header=True, sep=',', schema=dfSchema)
df.show()

# Data available

The following function allow you to explore the dataset. A widget is available to facilitate the visualisation and allow to dynamically change the stock.

In [None]:
def explore_ds(df, input):
	df_one = df.filter(df.company_name == input).orderBy('Date')

	# Show the first and last 40 rows of stock price
	print('First 40 lines :')
	spark.createDataFrame(df_one.head(40)).show(40)
	print('Last 40 lines :')
	spark.createDataFrame(df_one.tail(40)).show(40)

	# Get the number of observations
	print(f'Number of observations : {df_one.count()}')

	# Period
	dini = df_one.select(min('Date')).collect()[0][0]
	dfin = df_one.select(max('Date')).collect()[0][0]
	inter = (dfin - dini).days
	print(f'Period between {dini} and {dfin} : {inter} days')

	# Descriptive statistics
	print('Statistics :')
	df_one.summary().show()

	# Number of missing values
	print('Missing values :')
	df_one.select([count(when(col(c).isNull(), c)).alias(c) for c in df_one.columns]).show()

	# Correlation between values
	cols = ["High", "Low", "Open", "Close", "Volume", "Adj Close"]
	for i in range(len(cols)):
		for j in range(i + 1, len(cols)):
			print(f"Correlation {cols[i]}/{cols[j]}: {df_one.corr(cols[i], cols[j])}")


In [None]:
def display_info_by_company(df):
    w_output = widgets.Output()
    w_inputs = widgets.Dropdown(
        options=[(companies[i], i) for i in range(len(companies))],
        value=0,
        description="Input",
    )

    def intern_display(change):
        w_output.clear_output()
        with w_output:
            input = w_inputs.options[w_inputs.value][0]
            explore_ds(df, input)
        
    w_inputs.observe(intern_display)
    display(w_inputs)
    display(w_output)

display_info_by_company(df)

## Exploration

In [None]:
# used to change date granularity
granularity = {
    'day' : ['company_name', year('Date').alias('year'), month('Date').alias('month'),  dayofmonth('Date').alias('day')],
    'week' : ['company_name', year('Date').alias('year'), weekofyear('Date').alias('week')],
    'month' : ['company_name', year('Date').alias('year'), month('Date').alias('month')],
    'year' : ['company_name', year('Date').alias('year')]
}

# used to visualize the data in chronological order
order_by = {
	'day' : ['year', 'month', 'day'],
	'week' : ['year', 'week'],
	'month' : ['year', 'month'],
	'year' : ['year'],
}

columns = ['Close', 'Open', 'High', 'Low', 'Volume', 'Adj Close']

We start the exploration by computing the average values for the open and close price as well as the volume for dfferent time periods.

In [None]:
def compute_average(cur_df, gran = 'year', cols = (mean('Open').alias('Open_Mean'), mean('Close').alias('Close_mean'), mean('Volume').alias('Volume_mean'))):
    return (cur_df
    .groupBy(granularity[gran])
    .agg(*cols).orderBy(order_by[gran]))

We then compute the variation of the average values of the close price and the volume.

In [None]:
def compute_variation(cur_df, gran = 'year', cols = (mean('Close').alias('Close_mean'), mean('Volume').alias('Volume_mean'))):
    cur_df = compute_average(df, gran, cols)
    my_window = Window.partitionBy("company_name").orderBy(order_by[gran])
    cur_df = cur_df.withColumn("prev_close", lag(cur_df.Close_mean).over(my_window))
    cur_df = cur_df.withColumn("Close_diff", when(isnull(cur_df.Close_mean - cur_df.prev_close), 0)
                            .otherwise(cur_df.Close_mean - cur_df.prev_close))
    cur_df = cur_df.drop("prev_close")

    cur_df = cur_df.withColumn("prev_volume", lag(cur_df.Volume_mean).over(my_window))
    cur_df = cur_df.withColumn("Volume_diff", when(isnull(cur_df.Volume_mean - cur_df.prev_volume), 0)
                                .otherwise(cur_df.Volume_mean - cur_df.prev_volume))
    cur_df = cur_df.drop("prev_volume")
    return cur_df

The following function is a generic function enabling dynamic visualisation with widgets.

In [None]:
def display_frame(func, df, cols = (mean('Open').alias('Open_Mean'), mean('Close').alias('Close_mean'), mean('Volume').alias('Volume_mean'))):
    w_granu = widgets.RadioButtons(
        options=granularity.keys(),
        value=list(granularity.keys())[0],
        description="Granularity",
    )

    w_output_g = widgets.Output()

    def f(change):
        w_output_g.clear_output()
        with w_output_g :
            func(df, w_granu.value, cols).show()
                

    w_granu.observe(f)
    display(w_granu)
    display(w_output_g)


In [None]:
display_frame(compute_variation, df)

We add a column containing the daily return.

In [None]:
# Daily return
def add_daily_return(df):
	return df.withColumn('Daily return', col('Open') - col('Close'))

df_dr = add_daily_return(df)
df_dr.show()

Max daily return :

In [None]:
# Highest daily return
def get_max_daily_return(df):
	df = add_daily_return(df)
	return df.select(max('Daily return')).collect()[0][0]

In [None]:
max_daily_return = get_max_daily_return(df)
df_dr.filter(col('Daily return') == max_daily_return).show()

Visualisation of the average daily return for different time periods :

In [None]:
def get_average_daily_return(df, gran, var):
    df = add_daily_return(df)
    return compute_average(df, gran, var)

display_frame(get_average_daily_return, df, (mean('Open').alias('Open_Mean'), mean('Close').alias('Close_mean'), mean('Daily return').alias('Average daily return')))

We can also plot these values, to better visualize them.

In [None]:
def plot_frame(df):
	colors = {
		'APPLE': 'r',
		'AMAZON': 'b',
		'FACEBOOK': 'c',
		'GOOGLE': 'y',
		'MICROSOFT': 'm',
		'TESLA': 'k',
		'ZOOM': 'g'
	}

	fig, axs = plt.subplots(3, 3)

	j = 0
	for gran in ['year', 'week', 'day']:
		df_tmp = compute_average(df, gran, (mean('Daily return').alias('Daily return Mean'),
											mean('Open').alias('Open Mean'), 
											mean('Close').alias('Close mean'), 
											min('Date').alias('Date')))
		min_date = df_tmp.select(min('Date')).collect()[0][0]
		max_date = df_tmp.select(max('Date')).collect()[0][0]
		
		i = 0
		for cur_graph in ['Close', 'Open', 'Daily return']:

			for name, color in colors.items():
				tmp_col = (df_tmp
						.orderBy('Date')
						.filter(df_tmp.company_name == name))
				axs[i, j].plot(tmp_col.select('Date').collect(),
							tmp_col.select(cur_graph + ' Mean').collect(),
							color=color,
							label=name)

			axs[i, j].set_xlim((min_date, max_date))
			axs[i, j].set_title(f'{cur_graph} of stocks per {gran}')

			i += 1

		j += 1

	axs[0, 0].legend()

	plt.show()


In [None]:
plot_frame(df_dr)

# Moving Average

A good metric is the moving average, that better show global tendancies

In [None]:
def moving_average(dataframe, column_name, nb_point):
    def days(i): 
        return i * 86400
    w = Window().partitionBy("company_name").orderBy(col("Date").cast('long')).rangeBetween(-days(nb_point), 0)

    return dataframe.withColumn('Moving average', avg(column_name).over(w))

In [None]:
moving_average(df, 'Close', 5).show()

# Correlation

We can display the coefficient of correlations between columns

In [None]:
from pyspark.sql.functions import corr

def correlation(data_frame_A, data_frame_B, col_name):
    """
        data_frame_A: has one column Date to join the data
        data_frame_B: has one column Date to join the data
        col_name: must be present in both data frame
    """
    return data_frame_A.join(data_frame_B.withColumnRenamed(col_name, col_name + "_B"), on="Date",how="inner").corr(col_name, col_name + "_B")

In [None]:
def display_cor(df):
    w_output = widgets.Output()
    w_inputs_A = widgets.Dropdown(
        options=[(companies[i], i) for i in range(len(companies))],
        value=0,
        description="Company A",
    )
    w_inputs_B = widgets.Dropdown(
        options=[(companies[i], i) for i in range(len(companies))],
        value=0,
        description="Company B",
    )
    
    w_inputs_column = widgets.Dropdown(
        options=[(columns[i], i) for i in range(len(columns))],
        value=0,
        description="Column",
    )

    all_w_inputs = [w_inputs_A, w_inputs_B ,w_inputs_column]

    def cor_display(change):
        w_output.clear_output()
        with w_output:
            comp_A = w_inputs_A.options[w_inputs_A.value][0]
            company_A_df = df.filter(df.company_name == comp_A)
            
            comp_B = w_inputs_B.options[w_inputs_B.value][0]
            company_B_df = df.filter(df.company_name == comp_B)
            
            column = w_inputs_column.options[w_inputs_column.value][0]
            
            print(correlation(company_A_df, company_B_df, column))

    for w in all_w_inputs:
        w.observe(cor_display)
        display(w)
    display(w_output)

display_cor(df)

We can plot correlation matrices for better visualisation

In [None]:
def compute_corr_for_col(df, col_name, companies):

    corr = [None] * len(companies)
    for i, comp in enumerate(companies):
        corr[i] = [0.0] * len(companies)
        for j in range(i):
            corr[i][j] = corr[j][i]
        for j, other in enumerate(companies[i:]):
            corr[i][i + j] = correlation(df.filter(df.company_name == comp), df.filter(df.company_name == other), col_name)
    
    return spark.createDataFrame(
        corr, ','.join([f"{c} float" for c in companies])
    )



def display_corr_for_cols(df, cols, companies, uniform_bar=False):
    w = 3
    h =  len(cols) // 3 + (0 if len(cols) % 3 == 0 else 1)
    fig, axs = plt.subplots(h, w, figsize=(22, h * 4))
    
    for i, col in enumerate(cols):
        df_corr = compute_corr_for_col(df, col, companies)
        ax = axs[i // 3, i % 3] if len(cols) > 3 else axs[i]
        mat = ax.matshow(df_corr.collect())
        plt.sca(ax)
        plt.xticks(range(len(companies)), companies, fontsize=14, rotation=45)
        plt.yticks(range(len(companies)), companies, fontsize=14)
        cb = fig.colorbar(mat, ax=ax,fraction=0.05, pad=0.08)
        if uniform_bar:
            mat.set_clim(-1, 1)
        ax.tick_params(labelsize=14)
        ax.title.set_text(f"Correlation Matrix for column {col}");
    
    for j in range(len(cols), w * h):
        ax = axs[j // 3, j % 3] if len(cols) > 3 else axs[j]
        ax.axis('off')
        
    plt.subplots_adjust(left=0.1,
                    bottom=0.1, 
                    right=0.9, 
                    top=0.9, 
                    wspace=1, 
                    hspace=0.6)
    plt.show()

In [None]:
display_corr_for_cols(df, ['Volume', 'Low', 'High', 'Open', 'Close', 'Adj Close'], companies)

display_corr_for_cols(df, ['Volume', 'Low', 'High', 'Open', 'Close', 'Adj Close'], companies, uniform_bar=True)

# Return rate

We can display the return rate, which indicates the net gain or loss of an investment over a specified time period, expressed as a percentage of the investment’s initial cost.

In [None]:
def compute_return_rate(cur_df, gran, var):
    cur_df = compute_variation(cur_df, gran, var)
    cur_df = cur_df.withColumn("Return_rate", when(isnull(cur_df.Close_diff / cur_df.Close_mean * 100), 0)
                            .otherwise(cur_df.Close_diff / cur_df.Close_mean * 100))
    return cur_df

In [None]:
display_frame(compute_return_rate, df, (mean('Open').alias('Open_Mean'), mean('Close').alias('Close_mean'), mean('Volume').alias('Volume_mean')))

# Best return rate

We can also return the company with the best return rate for a given month

In [None]:
from dateutil.relativedelta import relativedelta

def best_return_rate(data_frame, initial_date_str, gran):
    if gran == "month":
        period_size = relativedelta(months=1)
        date = datetime.strptime(initial_date_str, '%Y-%m').date()
    elif gran == "year":
        period_size = relativedelta(years=1)
        date = datetime.strptime(initial_date_str, '%Y').date()
    else:
        print('Bad granularity, accepted are : month or year')
        return
    
    initial_date = date
    end_date = date + period_size
    
    df = compute_return_rate(data_frame, gran, (mean('Open').alias('Open_Mean'), mean('Close').alias('Close_mean'), mean('Volume').alias('Volume_mean'), min('Date').alias('Date')))
    
    df = df.filter(df.Date >= initial_date)
    df = df.filter(df.Date < end_date)
    
    df = df.select('Date', 'company_name', 'Return_rate')
    
    max_rr = df.select(max('Return_rate')).collect()[0][0]
    df = df.filter(df.Return_rate == max_rr) 
    
    return df

In [None]:
best_return_rate(df, '2018', 'year').show()

# Insights

Some insights that we can implement :

- stability (différence between 'High' and 'Low')
- add median
- add standart deviation and interquartile difference
- stocks prediction with time series prediciton model (RNN)
- lowest daily return
- lowest return rate

# Prediction

In [None]:
df.show()

In [None]:
def buildRegDF(df, company, gran="day", max_lag=5):
    data_cols = ['Close', 'Open', 'High', 'Low']
    cols = (min('Date').alias('Date'), mean('Close').alias('Close'), mean('Open').alias('Open'), mean('High').alias('High'), mean('Low').alias('Low'))
    
    df_base = df.filter(df.company_name == company)

    df_lr = compute_average(df_base, gran, cols)
    my_window = Window.orderBy(order_by[gran])

    inputCols = []
    for i in range(1, max_lag):
        for c in data_cols :
            col_name = f"{c} {gran}-{i}"
            df_lr = df_lr.withColumn(col_name, lag(df_lr[c], i).over(my_window))
            inputCols.append(col_name)

    df_lr = df_lr.dropna()
    return df_lr, inputCols

df_lr, inputCols = buildRegDF(df, "TESLA")

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression, IsotonicRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler

def buildRegPipelineModel(df_lr, colToEstimate, inputCols):
    trainDF, testDF = df_lr.randomSplit([.8, .2], seed=42)
    print(f"There are {trainDF.cache().count()} rows in the training set, and {testDF.cache().count()} in the test set")

    vecAssembler = VectorAssembler(inputCols=inputCols, outputCol="features")

    reg = LinearRegression(featuresCol="features", labelCol=colToEstimate) 
    # reg = IsotonicRegression(featuresCol="features", labelCol=colToEstimate)

    pipeline = Pipeline(stages=[vecAssembler, reg])
    pipelineModel = pipeline.fit(trainDF)

    predTrainDF = pipelineModel.transform(trainDF)
    predTestDF = pipelineModel.transform(testDF)

    # predTrainDF.select(colToEstimate, "prediction").show(10)
    # predTestDF.select(colToEstimate, "prediction").show(10)

    regressionMeanEvaluator = RegressionEvaluator(predictionCol="prediction", labelCol=colToEstimate, metricName="rmse")

    print(f"The RMSE for predicting the {colToEstimate} (train) is: {regressionMeanEvaluator.evaluate(predTrainDF):.2f}")
    print(f"The RMSE for predicting the {colToEstimate} (test) is: {regressionMeanEvaluator.evaluate(predTestDF):.2f}")

    return pipelineModel



In [None]:
def shiftColumn(df, inputCols, cols_name):
    cols_shift = cols_name + inputCols
    for i in range(len(cols_shift) - 1, len(cols_name) - 1, -1):
        df = df.withColumn(cols_shift[i], col(cols_shift[i - 4]))
        # print(f"Shift from {cols_shift[i - 4]} to {cols_shift[i]}")
    return df

def predictCaracteristic(df, company, offset, gran="day", max_lag=2):
    data_cols = ['Close', 'Open', 'High', 'Low']

    df_lr, inputCols = buildRegDF(df, company, gran, max_lag)

    mDate = df_lr.select(max('Date')).collect()[0][0]
    last_line = df_lr.orderBy(col('Date').asc()).filter(df_lr.Date == mDate)

    pipelines = [buildRegPipelineModel(df_lr, col, inputCols) for col in data_cols]

    for i in range(offset):
        print(f"{gran} +{i}")
        # last_line.show()
        # last_line.select(data_cols + inputCols).show()
        # last_line.select(data_cols).show()

        last_line = shiftColumn(last_line, inputCols, data_cols)
        
        for ch in range(len(data_cols)):
            last_line = pipelines[ch].transform(last_line)
            last_line = last_line.withColumn(data_cols[ch], last_line.prediction).drop("prediction", "features")
            
    return last_line
    
predictCaracteristic(df, "TESLA", 10, max_lag=5)
