## Import required packages

In [0]:
# Download required packages
!pip -q install gdown missingno torch

%matplotlib inline

import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark import SparkContext, SparkConf
from pyspark.sql.window import Window


import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import missingno as msno
import torch
import torch.nn as nn
from typing import *
import datetime
import gdown

import tqdm as tq
def tqdm(*args, **kwargs):
  ''' Small trick to prevent tqdm printing newlines at each step. '''
  return tq.tqdm(*args, **kwargs, leave=True, position=0)

## Data aquisition
We retrieve our datasets and download them to a temporary directory in the driver node.

In [0]:
!rm -rf /tmp/data /tmp/__MACOSX
gdown.download('https://drive.google.com/uc?id=1ggmDp-AWFzbQReLG0pLpQE_3fO0C0RnM', '/tmp/data.zip', quiet=False)
!unzip -q /tmp/data.zip -d /tmp/
!rm /tmp/data.zip

Then we load the datasets to the DBFS.

In [0]:
dbutils.fs.mv("file:/tmp/data", "dbfs:/data", recurse=True)

In [0]:
%fs ls /data/

path,name,size
dbfs:/data/.DS_Store,.DS_Store,6148
dbfs:/data/key_stats_yahoo.csv,key_stats_yahoo.csv,2047081
dbfs:/data/prices/,prices/,0


In [0]:
%fs ls /data/

path,name,size
dbfs:/data/.DS_Store,.DS_Store,6148
dbfs:/data/key_stats_yahoo.csv,key_stats_yahoo.csv,2047081
dbfs:/data/prices/,prices/,0


## Dataset loading

In [0]:
key_stats_df = spark.read.load("dbfs:/data/key_stats_yahoo.csv", 
                           format="csv",
                           sep=",",
                           inferSchema="true",
                           header="true"
                          )

# Drop the first ID column
key_stats_df = sc.parallelize(key_stats_df.drop(key_stats_df.columns[0]).head(1005)).toDF()#TODO: remove head(n) (only meant for development)
#key_stats_df = key_stats_df.drop(key_stats_df.columns[0])
key_stats_df.schema['Date'].nullable = False

# Use legacy format to parse dates
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
key_stats_df = key_stats_df.withColumn("Date", F.to_date(key_stats_df["Date"], 'MM/dd/yyyy HH:mm'))

# Cast numerical columns to double
for column in key_stats_df.columns[2:]:
  key_stats_df = key_stats_df.withColumn(column, key_stats_df[column].cast("double"))

# Prices dataframes for every stock #TODO: remove :N (only meant for development)
prices_files = [f.path for f in dbutils.fs.ls('/data/prices/')[:10] if f.path.endswith('.csv')]
#prices_files = [f.path for f in dbutils.fs.ls('/data/prices/') if f.path.endswith('.csv')]
dfs_names = [f.rsplit('/', 1)[1][:-len('.csv')] for f in prices_files]
prices_dfs = []
for f in tqdm(prices_files, desc='Reading stock price data', total=len(prices_files)):
  df = spark.read.load(f,
                       format="csv",
                       sep=",",
                       inferSchema="true",
                       header="true"
                      )
  df = df.withColumn("Date", F.to_date(df["Date"], 'dd-MM-yyyy'))
  df.schema['Date'].nullable = False
  prices_dfs.append(df)

In [0]:
#

## Dataset analysis

In [0]:
print("Prices dataframe format:")
prices_dfs[0].printSchema()

In [0]:
print("Key stats dataframe format:")
key_stats_df.printSchema()

### Utility functions

In [0]:
# TODO: add remaining utility functions

def prices_df_nan_summary(prices_dfs: List[pyspark.sql.DataFrame], names: List[str]) -> pd.DataFrame:
  ''' Utility function to summarize columns that have missing values. '''
  nan_dfs = []
  for prices_df, name in tqdm(zip(prices_dfs, names), total=len(prices_dfs), desc='Generating prices summary ...'):
    nan_absolute = prices_df.agg(*[F.count(F.when(F.isnull(c), c)).alias(c) for c in prices_df.columns]).first()
    if any(nan_absolute):
      # Simple conversion from Pyspark row -> Python set of values
      values = set(nan_absolute.asDict().values()).difference({0})
      # Either we don't have values for that row, or we have all of them (but Date which is non-nullable)
      # Values contains the no. of NaN values and 0 in correspondance of the Date column
      assert len(values) == 1
      nan_count = values.pop()
      nan_dfs.append((name, round(100*nan_count/prices_df.count(), 3), nan_count))

  return pd.DataFrame(nan_dfs, columns=['Stock name', 'Missing data (%)', 'Count'])

def remove_trailing_nan(df: pyspark.sql.DataFrame, ticker: str, col: str = 'Low') -> pyspark.sql.DataFrame:
  '''
    A trick to detect if the input DataFrame ends with a contiguous collection of NaN rows, returns the dataframe without them.
  '''
  # The total number of rows of the dataframe
  df_length = df.count()
  
  # Sort the input dataframe and add a new column to keep track of the relative position of each row
  df_sorted_id = df.sort('Date').withColumn('id', F.row_number().over(Window.orderBy('Date')))
  
  # Tricky part here: create a new column called 'cumsum' that will store the progressive number of consecutive NaN in our dataset.
  # Let's break it into smaller parts:
  # 1) create an index generator that will partition by 'Low' values [(...,Null...), (...,value1,...), (...,value2...), ... (...)] and within rows order by date
  # Example:
  ## |2019-06-05|null|null|  null|null| null|          null|624|        21|
  ## |2019-06-06|null|null|  null|null| null|          null|625|        22|
  ## |2019-06-07|null|null|  null|null| null|          null|626|        23| <- last column is the cumulative sum (i.e. the number of consecutive NaN)
  ## ...
  ## |2019-05-09|23.2|11.5|   7.3|4.2|  16.2|          29.1|1  |         0|
  ## |2019-05-10|23.2|11.5|   7.3|4.2|  16.2|          29.1|2  |         0|
  # 2) assign to each row a progressive index starting from 1 if it has null in correspondance of Low, zero otherwise
  # 3) store these values into a new column called cumsum (i.e. it behaves like pandas cumsum)
  # 4) at the end, the row whose ID corresponds to the length of the dataframe will contain at column 'cumsum' the no. of trailing NaN values.
  cumsum_df = df_sorted_id.withColumn('cumsum', F.when(F.isnull(df_sorted_id.Low), F.row_number().over(Window.partitionBy('Low').orderBy('Date'))).otherwise(0))

  # Retrieve the "last" row and read the value of cumsum
  end_idx = cumsum_df.where(cumsum_df['id'] == df_length).first().cumsum
  
  # Retain rows whose index is lower len(df) - end_idx + 1 (i.e. cut trailing NaN values)
  return df_sorted_id.where(df_sorted_id['id'] <= df_length-end_idx+1)


def merge_prices_fundamentals(
    prices_dfs: List[pyspark.sql.DataFrame],
    key_stats_df: pyspark.sql.DataFrame,
    dfs_names: List[str],
    drop_cols: List[str] = ['Date', 'Ticker', 'Price']
    ) -> List[pyspark.sql.DataFrame]:
  # Define the target list of dataframes
  prices_dfs_new = []
  for ticker in tqdm(key_stats_df.select('Ticker').distinct().collect(), desc='Merging the datasets ...'):
    ticker = ticker[0]
    # Consider only stocks for which we have fundamental data
    if ticker.upper() not in dfs_names: continue
      
    # The prices dataframe associated to the current ticker
    prices_df_idx = dfs_names.index(ticker.upper())
    prices_df = prices_dfs[prices_df_idx]
    
    # Retrieve financial reports for the current ticker
    ticker_df = key_stats_df.filter(F.col('Ticker') == ticker)
    
    # Perform an inner join between the two dataframes, we first take all reports
    # whose date is at most the prices date (then we will take the latest available)
    joined_df = ticker_df.join(prices_df, ticker_df.Date <= prices_df.Date, how='inner')
    
    # Let's break this into smaller parts:
    # 1) Add a new column for the date latest available report for that stock. To do that
    #    we partition over **prices** Date using the Window object, and the the max over
    #    **ticker** Date.
    # 2) Retain those columns whose financial report date is the same as in ticker_df_max_date
    # 3) Finally drop the **ticker** Date and other unused columns
    joined_df = joined_df.withColumn('ticker_df_max_date', F.max(ticker_df['Date'])\
                      .over(Window.partitionBy(prices_df['Date'])))\
                      .where(ticker_df['Date'] == F.col('ticker_df_max_date'))\
                      .drop('ticker_df_max_date', 'High', 'Low', 'Open', 'Close')\
                      .drop(ticker_df['Date'])\
                      
    # Return the usual list of dataframes
    prices_dfs_new.append(joined_df)
    
  return prices_dfs_new
    
                
def fill_missing_days(
  aggregate_dfs: List[pyspark.sql.DataFrame],
  remove_weekends: bool = True,
  end_date: str = '2014-01-01'
) -> List[pyspark.sql.DataFrame]:
  result_dfs = []
  
  @udf("boolean")
  def is_weekday(date: datetime) -> bool:
      ''' Returns true if the provided date corresponds to a weekday. '''
      return date.weekday() < 5
    
  for df in tqdm(aggregate_dfs, desc='Filling missing days ...'):
    # In this case we apply the following steps in order to apply fill-forward to our dataset:
    # 1) define a Window object the will over the dataset sorted with decreasing dates
    # 2) compute the difference in days between consecutive rows (11/05/2021 - 07/05/2021 ==> 4 days)
    #    and store it in a column named diff
    # 3) compute an increasing index in the column seq that is the days gap we need to fill
    # Example:
    # |Date      | Adjusted close   |diff|seq|new_date  |
    # |2021-04-09|116.80999755859375|   3|  1|2021-04-11|
    # |2021-04-09|116.80999755859375|   3|  2|2021-04-10|
    # |2021-04-09|116.80999755859375|   3|  3|2021-04-09|
    # 4) create a new column new_date as the result of the computation F.col('Date') + F.col('diff') - F.col('seq')
    # 5) only retain rows whose date is before the provided end_date
    #
    # N.B. if we want to bfill the values we should instead consider the dataframes with dates in ascending order,
    # and subtract the value of F.col('diff') instead of summing it. Other parts of the code stays the same.
    df = df\
    .withColumn('diff', F.datediff(F.lag(F.col('Date'),1).over(Window.orderBy(F.desc('Date'))), F.col('Date')))\
    .withColumn('seq', F.explode(F.sequence(F.lit(1), F.col("diff"))))\
    .withColumn('new_date', (F.col('Date') + F.col('diff') - F.col('seq')))\
    .where(F.col('new_date') < pd.Timestamp(end_date))
    
    # If specified, remove weekends that are most likely created by us as synthetic data with the ffill technique
    if remove_weekends:
      df = df.where(is_weekday(F.col('new_date')))
      
    # Drop unecessary columns
    df = df.drop('Date', 'diff', 'seq').withColumnRenamed('new_date', 'Date')
    
    # If the dataframe is actually non-empty, add it to the list of resulting dataframes
    if df.count() > 0:
      result_dfs.append(df)
  return result_dfs

def missing_values_summary(df):
  ''' Returns a utility summary to view missing values in our dataframe. '''
  n = df.count()
  
  def to_percentage(x: pyspark.sql.column.Column, n: int) -> int:
    ''' Utility function to compute the amount of missing values as a percentage of the original dataframe. '''
    return F.round(100 * x / n, 3)
  
  # Aggregate using the count function over null values, and return a view over the obtained (single row) dataframe
  return df.agg(*[to_percentage(F.count(F.when(F.isnull(c), c)), n).alias(c) for c in df.columns]).first()

def scale_features(dfs: List[pyspark.sql.DataFrame]) -> None:
    ''' Scales the numerical features to unit variance. '''
    
    scalable_features = ['DE Ratio', 'Trailing P/E', 'Price/Sales', 'Price/Book',
       'Profit Margin', 'Operating Margin', 'Return on Assets',
       'Return on Equity', 'Revenue Per Share', 'Market Cap',
       'Enterprise Value', 'PEG Ratio', 'Enterprise Value/Revenue',
       'Enterprise Value/EBITDA', 'Revenue', 'Gross Profit', 'EBITDA',
       'Net Income Avl to Common ', 'Diluted EPS', 'Earnings Growth',
       'Revenue Growth', 'Total Cash', 'Total Cash Per Share', 'Total Debt',
       'Current Ratio', 'Book Value Per Share', 'Cash Flow', 'Beta', 'Volume',
       'Adjusted Close', 'SMA', 'RSI']
    

    aggregate_df = dfs[0]
    for df in dfs[1:]:
      aggregate_df = aggregate_df.union(df)
      
    mean = aggregate_df.select(
      [F.mean(F.col(column)) for column in scalable_features]
    ).collect()[0]
    
    std = aggregate_df.select(
      [F.stddev(F.col(column)) for column in scalable_features]
    ).collect()[0]

    
    for i in tqdm(range(len(dfs)), desc='Scaling numerical features ...'):
      # Scaling scalable features
      for j, feature in enumerate(scalable_features):
        dfs[i] = dfs[i].withColumn(feature, (dfs[i][feature]-mean[j])/std[j])
        
def impute_missing_values(
  prices_dfs_new: List[pyspark.sql.DataFrame],
  key_stats_df: pyspark.sql.DataFrame
) -> Tuple[List[pyspark.sql.DataFrame], pyspark.sql.DataFrame]:
  # define the window
  window = Window.orderBy('Date').rowsBetween(Window.unboundedPreceding, 0)

  # Forward filling values 
  # (ref. https://stackoverflow.com/questions/38131982/forward-fill-missing-values-in-spark-python/50422240#50422240)
  for i in range(len(prices_dfs_new)):
    for col_name in prices_dfs_new[i].schema.names:
      col = F.last(prices_dfs_new[i][col_name], ignorenulls=True).over(window)
      prices_dfs_new[i] = prices_dfs_new[i].withColumn(col_name, col)

  # In this case this dataframe contains financial reports that may contain NaN values either because that
  # metric was not available at that time OR because it was monitoring an initial stage of a company growth.
  # What we do is to apply the classic fast-forward, and fill initial missing values with zeroes.
  # Please note: we also discard the 'Forward P/E' column since the imputation here would introduce too much noise.
  key_stats_df_new = key_stats_df.drop('Forward P/E')
  for col_name in key_stats_df_new.schema.names:
      col = F.last(key_stats_df_new[col_name], ignorenulls=True).over(window)
      key_stats_df_new = key_stats_df_new.withColumn(col_name, col)
  key_stats_df_new = key_stats_df_new.fillna(0.)
  
  return prices_dfs_new, key_stats_df_new

#### Technical indicators

In [0]:
def add_sma(dfs: List[pyspark.sql.DataFrame], period: int = 10) -> None:
    ''' Computes the Simple Moving Average from a given dataframe. '''
    w = (Window().partitionBy("Adjusted Close").orderBy(F.col("Date").cast('long')).rangeBetween(-period*86400, 0))
    #w = Window().partitionBy("Adjusted Close").orderBy(F.col("Date"))
    for i in tqdm(range(len(dfs)), desc='Adding SMA ...'):
        dfs[i] = dfs[i].withColumn(
          "SMA",
          F.avg("Adjusted Close").over(w)
        )

        
def add_rsi(dfs: List[pyspark.sql.DataFrame], period: int = 14) -> None:
    ''' Computes the Relative Strength Index from a given dataframe. 
        Formula available at https://en.wikipedia.org/wiki/Relative_strength_index.
        Also adds overbought and oversold when the RSI index hits 70 or 30.'''
    for j in tqdm(range(len(dfs)), desc='Adding RSI ...'):
        
      
        w = Window.partitionBy().orderBy("Date")

        dfs[j] = dfs[j].withColumn("prev_adj_close", F.lag(dfs[j]["Adjusted Close"]).over(w))
        dfs[j] = dfs[j].withColumn("current_gain", F.when(dfs[j]["Adjusted Close"] >= dfs[j]["prev_adj_close"],
                                                          dfs[j]["Adjusted Close"] - dfs[j]["prev_adj_close"])
                                                          .otherwise(0.0))
        dfs[j] = dfs[j].withColumn("current_loss", F.when(dfs[j]["prev_adj_close"] >= dfs[j]["Adjusted Close"],
                                                         dfs[j]["prev_adj_close"] - dfs[j]["Adjusted Close"])
                                                         .otherwise(0.0))
          
        w = (Window().partitionBy("current_gain").orderBy(F.col("Date").cast('long')).rangeBetween(-period*86400, 0))
        dfs[j] = dfs[j].withColumn(
          "smmau",
          F.avg("current_gain").over(w)
        )
        
        w = (Window().partitionBy("current_loss").orderBy(F.col("Date").cast('long')).rangeBetween(-period*86400, 0))
        dfs[j] = dfs[j].withColumn(
          "smmad",
          F.avg("current_loss").over(w)
        )
        
        dfs[j] = dfs[j].withColumn("RSI", 100 - (100/(1+dfs[j]["smmau"]/dfs[j]["smmad"])))                        
        dfs[j] = dfs[j].withColumn("Overbought", F.when(dfs[j]["RSI"] >= 70, 1.0).otherwise(0.0))
        dfs[j] = dfs[j].withColumn("Oversold", F.when(dfs[j]["RSI"] <= 30, 1.0).otherwise(0.0))
        
        dfs[j] = dfs[j].drop("prev_adj_close", "current_gain", "current_loss", "smmau", "smmad")

In [0]:
print("Overview of the missing values in the key_stats dataframe\n")
key_stats_summary = missing_values_summary(key_stats_df)
key_stats_summary

### Missing values imputation

In [0]:
summary = prices_df_nan_summary(prices_dfs, dfs_names)
px.bar(summary, x='Stock name', y='Missing data (%)', hover_data=['Count'], title="Stock price dataset before preprocessing (only columns with missing values are displayed)")

For most of the above stocks with missing values, we noticed that they indeed exist up to a given time and after that no more data is available. It may due to a business failure, hence no more stocks will be exchanged from that moment on.

In [0]:
# Clear our input data from training NaN values
prices_dfs_new = [remove_trailing_nan(df,name) for df,name in tqdm(zip(prices_dfs, dfs_names), total=len(prices_dfs), desc='Removing trailing NaN values ...')]

# Remove INTH stock from our dataset since it contains many inactivity periods
#inth_idx = dfs_names.index('INTH') #TODO: uncomment
#del dfs_names[inth_idx] #TODO: uncomment
#del prices_dfs_new[inth_idx] #TODO: uncomment

summary = prices_df_nan_summary(prices_dfs_new, dfs_names)
px.bar(summary, x='Stock name', y='Missing data (%)', hover_data=['Count'], title="Stock price dataset after preprocessing (only columns with missing values are displayed)")

At this point we use the fast forward imputation technique to fill-in missing values. Please note that in this case missing values are mostly due to holidays or periods when stocks are not exchanged.

### Building our new dataset

In [0]:
# Impute missing values in the prices dataset (i.e. fast-forward last valid values)
prices_dfs_new, key_stats_df_new = impute_missing_values(prices_dfs_new, key_stats_df)

# Merge the stock price dataset with fundamental data of the relative company
aggregate_dfs = merge_prices_fundamentals(prices_dfs_new, key_stats_df_new, dfs_names)

# Fill gaps from the original dataset
dfs = fill_missing_days(aggregate_dfs)
                  
# Add SMA indicator to each dataframe
add_sma(dfs)

# Add RSI indicator to each dataframe
add_rsi(dfs)

# Scale numerical features
scale_features(dfs)

In [0]:
dfs[0].show()