In [None]:
#Mounting google drive
from google.colab import drive
drive.mount('/content/drive/')

Drive already mounted at /content/drive/; to attempt to forcibly remount, call drive.mount("/content/drive/", force_remount=True).


In [None]:
!pip install pyspark



In [None]:
# Importing libraries
import math
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

import pyspark
import pyspark.sql.functions as F
from pyspark.sql import SparkSession

In [None]:
#Function declaration
def print_stocks(stock_list,n):
  """
  This function prints the stock names of top 'n' stocks that outperforms
  """
  #Fetching stock name from the stock list
  stocks = stock_list.select(F.split('stock_name',r'Stocks/')[1])
  
  df_values = stocks.toPandas()
  i = 0
  for x in df_values.iloc[0:n,0]:
    if(i%10 == 0):
      print('')
    print(i+1,' - ',x[:-4], ' | ',end='')
    i+=1

In [None]:
# Unzip the files
! unzip '/content/drive/MyDrive/Colab Notebooks/8431/finance.zip' > /dev/null #Please change path while running on your instance

replace ETFs/aadr.us.txt? [y]es, [n]o, [A]ll, [N]one, [r]ename: A
A


In [None]:
# Creating spark object
spark = SparkSession.builder.appName("Day Trading").getOrCreate()

In [None]:
# Fetching ETF data
etf = spark.read.option("header", True).option("inferSchema", True).csv('ETFs/spy.us.txt').withColumn("filename", F.input_file_name())

# Fetching all stocks data
stocks = spark.read.option("header", True).option("inferSchema", True).csv("Stocks/*.us.txt").withColumn("filename", F.input_file_name())

In [None]:
#Declaring variables to filter data
first_date = '2007-01-01'
last_date = '2017-12-31'

## S&P 500

In [None]:
#Filtering S&P500 data from 2007 to 2017
df_SPY = etf.filter(F.col('Date').between(first_date,last_date))

In [None]:
#Extracting year
df_SPY = df_SPY.withColumn('Year', F.split(df_SPY['Date'],'-').getItem(0))

#Extracting month
df_SPY = df_SPY.withColumn('Month', F.split(df_SPY['Date'],'-').getItem(1))

#Year-Month key column
year_month = df_SPY.Year*100+df_SPY.Month
df_SPY = df_SPY.withColumn('YYYYMM', year_month)

In [None]:
df_SPY.show()

+----------+------+------+------+------+---------+-------+--------------------+----+-----+--------+
|      Date|  Open|  High|   Low| Close|   Volume|OpenInt|            filename|Year|Month|  YYYYMM|
+----------+------+------+------+------+---------+-------+--------------------+----+-----+--------+
|2007-01-03|123.94|124.46|122.47|123.16|108873668|      0|file:///content/E...|2007|   01|200701.0|
|2007-01-04|123.03|123.75| 122.5|123.42| 79937780|      0|file:///content/E...|2007|   01|200701.0|
|2007-01-05|123.12|123.18|122.31|122.44| 88039354|      0|file:///content/E...|2007|   01|200701.0|
|2007-01-08|122.67|123.19|122.19|122.99| 82276143|      0|file:///content/E...|2007|   01|200701.0|
|2007-01-09| 123.1|123.35|122.33|122.89| 86930150|      0|file:///content/E...|2007|   01|200701.0|
|2007-01-10|122.48|123.32|122.24| 123.3| 83203755|      0|file:///content/E...|2007|   01|200701.0|
|2007-01-11|123.31|124.25|123.26|123.85| 63745991|      0|file:///content/E...|2007|   01|200701.0|


In [None]:
#Daily returns
returns_daily = df_SPY.Close - df_SPY.Open
df_SPY = df_SPY.withColumn('Diff_Open_Close', returns_daily)

df_SPY.show()

+----------+------+------+------+------+---------+-------+--------------------+----+-----+--------+--------------------+
|      Date|  Open|  High|   Low| Close|   Volume|OpenInt|            filename|Year|Month|  YYYYMM|     Diff_Open_Close|
+----------+------+------+------+------+---------+-------+--------------------+----+-----+--------+--------------------+
|2007-01-03|123.94|124.46|122.47|123.16|108873668|      0|file:///content/E...|2007|   01|200701.0| -0.7800000000000011|
|2007-01-04|123.03|123.75| 122.5|123.42| 79937780|      0|file:///content/E...|2007|   01|200701.0| 0.39000000000000057|
|2007-01-05|123.12|123.18|122.31|122.44| 88039354|      0|file:///content/E...|2007|   01|200701.0| -0.6800000000000068|
|2007-01-08|122.67|123.19|122.19|122.99| 82276143|      0|file:///content/E...|2007|   01|200701.0|  0.3199999999999932|
|2007-01-09| 123.1|123.35|122.33|122.89| 86930150|      0|file:///content/E...|2007|   01|200701.0|-0.20999999999999375|
|2007-01-10|122.48|123.32|122.24

In [None]:
# Aggregating on monthly level
sp500_agg = df_SPY.groupBy('YYYYMM')\
                  .agg(F.mean('Diff_Open_Close'))\
                  .withColumnRenamed('avg(Diff_Open_Close)','SP500_monthly_return')                  

In [None]:
sp500_agg.show() # sp500 table can be used for comparison

+--------+--------------------+
|  YYYYMM|SP500_monthly_return|
+--------+--------------------+
|200802.0|0.013500000000000512|
|200801.0|-0.07333333333333363|
|200902.0|-0.19068421052631754|
|201007.0| 0.15361904761904757|
|201409.0|-0.05952380952380546|
|201602.0| 0.13499999999999943|
|200708.0|-0.04000000000000131|
|200909.0|  0.0573333333333337|
|201104.0| 0.04899999999999878|
|201702.0| 0.45684210526315827|
|201705.0| 0.08318181818182004|
|201608.0|-0.00521739130435...|
|201205.0|-0.15545454545454487|
|201710.0| 0.12954545454545172|
|201508.0| -0.3047619047619064|
|201001.0|-0.18415789473684185|
|201412.0|-0.04090909090909...|
|200810.0|-0.46582608695652167|
|200812.0| 0.16713636363636394|
|201304.0| 0.08727272727272929|
+--------+--------------------+
only showing top 20 rows



## Stock Performance

In [None]:
#Filtering stocks between the given dates
df_stocks = stocks.filter(F.col("Date").between(first_date,last_date))

#Showing data
df_stocks.show()

+----------+------+------+------+------+--------+-------+--------------------+
|      Date|  Open|  High|   Low| Close|  Volume|OpenInt|            filename|
+----------+------+------+------+------+--------+-------+--------------------+
|2007-01-03|30.116|30.715|30.096|30.572|53632518|      0|file:/content/Sto...|
|2007-01-04|30.572|30.594| 30.15|30.392|38619002|      0|file:/content/Sto...|
|2007-01-05|30.247|30.401|30.039|30.239|33199928|      0|file:/content/Sto...|
|2007-01-08| 30.15|30.328|29.989| 30.23|29445866|      0|file:/content/Sto...|
|2007-01-09|30.417|30.586|30.079| 30.23|30551941|      0|file:/content/Sto...|
|2007-01-10|30.096| 30.28|30.063|30.239|28829050|      0|file:/content/Sto...|
|2007-01-11|30.193|30.563|30.125| 30.53|37913995|      0|file:/content/Sto...|
|2007-01-12|30.466|30.594|30.328|30.504|32163662|      0|file:/content/Sto...|
|2007-01-16|30.594|30.794|30.536|30.683|38546214|      0|file:/content/Sto...|
|2007-01-17|30.739|30.821|30.473|30.578|37548329|   

In [None]:
#Having a look at last 5 enteries
df_stocks.tail(5)

[Row(Date='2017-11-10', Open=24.91, High=24.95, Low=24.88, Close=24.9499, Volume=53308, OpenInt=0, filename='file:/content/Stocks/omfl.us.txt'),
 Row(Date='2017-11-10', Open=13.3, High=14.63, Low=13.02, Close=13.08, Volume=9881303, OpenInt=0, filename='file:/content/Stocks/ppdf.us.txt'),
 Row(Date='2017-11-10', Open=21.06, High=22.0, Low=20.55, Close=21.16, Volume=2318015, OpenInt=0, filename='file:/content/Stocks/band.us.txt'),
 Row(Date='2017-11-10', Open=24.98, High=25.01, Low=24.98, Close=25.01, Volume=1033, OpenInt=0, filename='file:/content/Stocks/omfs.us.txt'),
 Row(Date='2017-11-09', Open=24.88, High=24.89, Low=24.86, Close=24.89, Volume=401, OpenInt=0, filename='file:/content/Stocks/jmom.us.txt')]

Data Manipulation

In [None]:
#Extracting year
df_stocks = df_stocks.withColumn('Year', F.split(df_stocks['Date'],'-').getItem(0))

#Extracting month
df_stocks = df_stocks.withColumn('Month', F.split(df_stocks['Date'],'-').getItem(1))

#Year-Month key column
year_month = df_stocks.Year*100+df_stocks.Month
df_stocks = df_stocks.withColumn('YYYYMM', year_month)

#Finding daily return
diff = df_stocks.Close - df_stocks.Open
df_stocks = df_stocks.withColumn('Diff_Open_Close', diff)

#Only using some selected columns --> #df_stocks2 = df_stocks.select(F.col('Date'), F.col('filename'), F.col('Diff_Open_Close'))
df_stocks.show()

+----------+------+------+------+------+--------+-------+--------------------+----+-----+--------+--------------------+
|      Date|  Open|  High|   Low| Close|  Volume|OpenInt|            filename|Year|Month|  YYYYMM|     Diff_Open_Close|
+----------+------+------+------+------+--------+-------+--------------------+----+-----+--------+--------------------+
|2007-01-03|30.116|30.715|30.096|30.572|53632518|      0|file:/content/Sto...|2007|   01|200701.0|  0.4559999999999995|
|2007-01-04|30.572|30.594| 30.15|30.392|38619002|      0|file:/content/Sto...|2007|   01|200701.0|-0.17999999999999972|
|2007-01-05|30.247|30.401|30.039|30.239|33199928|      0|file:/content/Sto...|2007|   01|200701.0|-0.00799999999999...|
|2007-01-08| 30.15|30.328|29.989| 30.23|29445866|      0|file:/content/Sto...|2007|   01|200701.0| 0.08000000000000185|
|2007-01-09|30.417|30.586|30.079| 30.23|30551941|      0|file:/content/Sto...|2007|   01|200701.0|-0.18700000000000117|
|2007-01-10|30.096| 30.28|30.063|30.239|

In [None]:
# Aggregating on stock level
df_stock_agg = df_stocks.groupBy('filename','YYYYMM').agg(F.mean('Diff_Open_Close'))

df_stock_agg.show()

+--------------------+--------+--------------------+
|            filename|  YYYYMM|avg(Diff_Open_Close)|
+--------------------+--------+--------------------+
|file:/content/Sto...|201503.0|  0.0672727272727281|
|file:/content/Sto...|201602.0| 0.06309999999999967|
|file:/content/Sto...|200908.0| 0.22866666666666666|
|file:/content/Sto...|200806.0|-0.10190476190476193|
|file:/content/Sto...|201306.0|-0.04050000000000083|
|file:/content/Sto...|201309.0| 0.12750000000000022|
|file:/content/Sto...|201303.0|  0.1693000000000012|
|file:/content/Sto...|201404.0| 0.06233333333333254|
|file:/content/Sto...|201305.0|  0.3380454545454544|
|file:/content/Sto...|201308.0|-0.22490909090909061|
|file:/content/Sto...|200810.0|-0.01043478260869512|
|file:/content/Sto...|201111.0|-0.01990476190476153|
|file:/content/Sto...|201201.0|-0.01294999999999...|
|file:/content/Sto...|200706.0|-0.20042857142857157|
|file:/content/Sto...|201201.0| -0.0946000000000005|
|file:/content/Sto...|200712.0|-0.070000000000

In [None]:
#Renaming column
df_stock_agg = df_stock_agg\
                    .withColumnRenamed('filename','stock_name')\
                    .withColumnRenamed('avg(Diff_Open_Close)','avg_return')

df_stock_agg.show()

+--------------------+--------+--------------------+
|          stock_name|  YYYYMM|          avg_return|
+--------------------+--------+--------------------+
|file:/content/Sto...|201503.0|  0.0672727272727281|
|file:/content/Sto...|201602.0| 0.06309999999999967|
|file:/content/Sto...|200908.0| 0.22866666666666666|
|file:/content/Sto...|200806.0|-0.10190476190476193|
|file:/content/Sto...|201306.0|-0.04050000000000083|
|file:/content/Sto...|201309.0| 0.12750000000000022|
|file:/content/Sto...|201303.0|  0.1693000000000012|
|file:/content/Sto...|201404.0| 0.06233333333333254|
|file:/content/Sto...|201305.0|  0.3380454545454544|
|file:/content/Sto...|201308.0|-0.22490909090909061|
|file:/content/Sto...|200810.0|-0.01043478260869512|
|file:/content/Sto...|201111.0|-0.01990476190476153|
|file:/content/Sto...|201201.0|-0.01294999999999...|
|file:/content/Sto...|200706.0|-0.20042857142857157|
|file:/content/Sto...|201201.0| -0.0946000000000005|
|file:/content/Sto...|200712.0|-0.070000000000

## Comparison between other stocks and S&P500

In [None]:
# Registering temporary table to be used in sql
sp500_agg.createOrReplaceTempView('sp500_agg')
df_stock_agg.createOrReplaceTempView('df_stock_agg')

In [None]:
whole_data = spark.sql("""
                  SELECT DFS.STOCK_NAME, DFS.YYYYMM, DFS.AVG_RETURN, SP.SP500_MONTHLY_RETURN
                  FROM DF_STOCK_AGG DFS
                  LEFT JOIN SP500_AGG SP
                  ON DFS.YYYYMM = SP.YYYYMM
              """)
whole_data.show()

+--------------------+--------+--------------------+--------------------+
|          STOCK_NAME|  YYYYMM|          AVG_RETURN|SP500_MONTHLY_RETURN|
+--------------------+--------+--------------------+--------------------+
|file:/content/Sto...|201503.0|  0.0672727272727281|-0.01909090909090...|
|file:/content/Sto...|201602.0| 0.06309999999999967| 0.13499999999999943|
|file:/content/Sto...|200908.0| 0.22866666666666666| 0.14223809523809633|
|file:/content/Sto...|200806.0|-0.10190476190476193| -0.3961904761904752|
|file:/content/Sto...|201306.0|-0.04050000000000083|-0.24199999999999733|
|file:/content/Sto...|201309.0| 0.12750000000000022| 0.04600000000000222|
|file:/content/Sto...|201303.0|  0.1693000000000012| 0.22849999999999682|
|file:/content/Sto...|201404.0| 0.06233333333333254|-0.18095238095238284|
|file:/content/Sto...|201305.0|  0.3380454545454544| 0.17181818181818057|
|file:/content/Sto...|201308.0|-0.22490909090909061|-0.08090909090909096|
|file:/content/Sto...|200810.0|-0.0104

In [None]:
# Registering temporary table to be used in sql
whole_data.createOrReplaceTempView('whole_data')

data = spark.sql("""
                      SELECT *,
                      CASE WHEN AVG_RETURN > SP500_MONTHLY_RETURN THEN 1 ELSE 0 END as FLAG
                      FROM WHOLE_DATA
                 """)
data.show()

+--------------------+--------+--------------------+--------------------+----+
|          STOCK_NAME|  YYYYMM|          AVG_RETURN|SP500_MONTHLY_RETURN|FLAG|
+--------------------+--------+--------------------+--------------------+----+
|file:/content/Sto...|201503.0|  0.0672727272727281|-0.01909090909090...|   1|
|file:/content/Sto...|201602.0| 0.06309999999999967| 0.13499999999999943|   0|
|file:/content/Sto...|200908.0| 0.22866666666666666| 0.14223809523809633|   1|
|file:/content/Sto...|200806.0|-0.10190476190476193| -0.3961904761904752|   1|
|file:/content/Sto...|201306.0|-0.04050000000000083|-0.24199999999999733|   1|
|file:/content/Sto...|201309.0| 0.12750000000000022| 0.04600000000000222|   1|
|file:/content/Sto...|201303.0|  0.1693000000000012| 0.22849999999999682|   0|
|file:/content/Sto...|201404.0| 0.06233333333333254|-0.18095238095238284|   1|
|file:/content/Sto...|201305.0|  0.3380454545454544| 0.17181818181818057|   1|
|file:/content/Sto...|201308.0|-0.22490909090909061|

In [None]:
#Finding how much did a stock outperform wrt SP500 on a monthly basis
outperform = data.AVG_RETURN - data.SP500_MONTHLY_RETURN
data = data.withColumn('Outperform', outperform)

data.show()

+--------------------+--------+--------------------+--------------------+----+--------------------+
|          STOCK_NAME|  YYYYMM|          AVG_RETURN|SP500_MONTHLY_RETURN|FLAG|          Outperform|
+--------------------+--------+--------------------+--------------------+----+--------------------+
|file:/content/Sto...|201503.0|  0.0672727272727281|-0.01909090909090...|   1| 0.08636363636363661|
|file:/content/Sto...|201602.0| 0.06309999999999967| 0.13499999999999943|   0|-0.07189999999999976|
|file:/content/Sto...|200908.0| 0.22866666666666666| 0.14223809523809633|   1| 0.08642857142857033|
|file:/content/Sto...|200806.0|-0.10190476190476193| -0.3961904761904752|   1| 0.29428571428571326|
|file:/content/Sto...|201306.0|-0.04050000000000083|-0.24199999999999733|   1| 0.20149999999999652|
|file:/content/Sto...|201309.0| 0.12750000000000022| 0.04600000000000222|   1|   0.081499999999998|
|file:/content/Sto...|201303.0|  0.1693000000000012| 0.22849999999999682|   0|-0.05919999999999562|


In [None]:
#Finding overall effect
outperforming_stocks = data.groupBy('STOCK_NAME').agg(F.sum('Outperform'),F.sum('FLAG'))

#Renaming columns
outperforming_stocks = outperforming_stocks\
                    .withColumnRenamed('sum(Outperform)','Outperforms_by')\
                    .withColumnRenamed('sum(FLAG)','Number_of_months_outperformed')

In [None]:
outperforming_stocks.show()

+--------------------+-------------------+-----------------------------+
|          STOCK_NAME|     Outperforms_by|Number_of_months_outperformed|
+--------------------+-------------------+-----------------------------+
|file:/content/Sto...|-0.4299814696490545|                           60|
|file:/content/Sto...|-0.6282466212137441|                           63|
|file:/content/Sto...| 1.0496859370098708|                           66|
|file:/content/Sto...| -3.418216071222166|                           54|
|file:/content/Sto...| -2.310570036537303|                           59|
|file:/content/Sto...|-2.9480445194177536|                           53|
|file:/content/Sto...|  8.526395755528519|                           80|
|file:/content/Sto...| 2.5615381569866953|                           74|
|file:/content/Sto...|  3.116369307491409|                           69|
|file:/content/Sto...|-2.7176769525262223|                           55|
|file:/content/Sto...| -3.515312623175174|         

In [None]:
stock_list = outperforming_stocks.sort('Number_of_months_outperformed', 'Outperforms_by', ascending=False)

In [None]:
stock_list.show()

+--------------------+------------------+-----------------------------+
|          STOCK_NAME|    Outperforms_by|Number_of_months_outperformed|
+--------------------+------------------+-----------------------------+
|file:/content/Sto...|202.34338039994594|                           91|
|file:/content/Sto...|14.843403988231469|                           88|
|file:/content/Sto...|15.027595034853322|                           87|
|file:/content/Sto...|12.953570903873626|                           86|
|file:/content/Sto...|20.517258923659604|                           84|
|file:/content/Sto...| 6.076593210628693|                           84|
|file:/content/Sto...|21.533794926768905|                           83|
|file:/content/Sto...|13.339167744955303|                           83|
|file:/content/Sto...|  4.46383982419818|                           83|
|file:/content/Sto...|  46.1843811742674|                           82|
|file:/content/Sto...|15.810474676150225|                       

## Results

In [None]:
print("Top 100 stocks are : ")
print_stocks(stock_list,100)

Top 100 stocks are : 

1  -  nvr.us  | 2  -  bio.us  | 3  -  mtd.us  | 4  -  shw.us  | 5  -  tpl.us  | 6  -  acn.us  | 7  -  isrl.us  | 8  -  eqix.us  | 9  -  cp.us  | 10  -  atri.us  | 
11  -  neu.us  | 12  -  wtm.us  | 13  -  itic.us  | 14  -  mkl.us  | 15  -  azo.us  | 16  -  dit.us  | 17  -  gww.us  | 18  -  rop.us  | 19  -  tfx.us  | 20  -  ntn.us  | 
21  -  intg.us  | 22  -  antm.us  | 23  -  ulti.us  | 24  -  morn.us  | 25  -  aon.us  | 26  -  fcnca.us  | 27  -  isrg.us  | 28  -  dhil.us  | 29  -  wina.us  | 30  -  hsy.us  | 
31  -  it.us  | 32  -  bwld.us  | 33  -  bcr.us  | 34  -  ibm.us  | 35  -  psa.us  | 36  -  obas.us  | 37  -  utmd.us  | 38  -  mmm.us  | 39  -  lmt.us  | 40  -  ads.us  | 
41  -  seb.us  | 42  -  cohr.us  | 43  -  orly.us  | 44  -  amzn.us  | 45  -  cmg.us  | 46  -  anss.us  | 47  -  noc.us  | 48  -  tdg.us  | 49  -  pcln.us  | 50  -  wat.us  | 
51  -  ksu.us  | 52  -  caci.us  | 53  -  uhal.us  | 54  -  intu.us  | 55  -  wltw.us  | 56  -  lfus.us  | 57  -

## Conclusion:

By using these 100 stocks, the average performance on monthly basis was found to exceed the S&P 500 over the course of ten-year(between 2007 and 2017)


#### Please note that our analysis is based on a strong assumption that:
Past performance is an absolute indicator of future perfromance

Note: This assumption doesn't hold true in real life!

__________________