#The Pandemicâ€™s Effect on the Stock Market

###source data in this project: 

1. sp500 stock list (csv file) in S3 bucket
2. covid case global data  (csv file) from mysql 
3. yahoo finance API: date, open, close, high, low, Adj Close, volume

###Planning and preparation:
1. upload two csv files from databricks to mysql and S3 bucket

In [0]:
#Read the covid case file into a DataFrame

# path ='/users/sophiawu/COVID_19_Daily_Counts_of_Cases__Hospitalizations__and_Deaths.csv'

# covid_df= spark.read.option('header', True).option('inferSchema', True).csv(path)

# #write covid case file to mysql database through jdbc

# jdbc_url = 'jdbc:mysql://database.ascendingdc.com:3306/de_001'
# user = 'sophiawu'
# password = 'welcome'
# jdbc_driver = "com.mysql.jdbc.Driver"

# (covid_df.write 
#     .format("jdbc") 
#     .option("url", jdbc_url) 
#     .option("user", user) 
#     .option("password", password) 
#     .option("dbtable", "covidCase") 
#     .mode("overwrite") 
#     .save()
# )



In [0]:
# #s3 bucket address 
# s3_bucket = 'asc-de-training-destination-s3'

# dbutils.fs.ls(f"s3a://{s3_bucket}/users/sophiawu/SP500.csv")

In [0]:
dbutils.fs.help()

In [0]:
# %pip install yfinance
# %pip install bokeh

%pip install -r requirements.txt 

In [0]:
# import the csv file from s3 bucket. 
# the top 500 largest publicly traded companies in the United States. 

s3_bucket = 'asc-de-training-destination-s3'
path = f"s3a://{s3_bucket}/users/sophiawu/SP500.csv"

SP500_df = spark.read.format('csv').option('inferSchema', True).option('header', True).load(path)
SP500_df.printSchema()

#retrieve sticker symbols and convert it to a list
my_tickers = [row['Symbol'] for row in SP500_df.select("Symbol").collect() if row['Symbol'] not in ['BRK.B', 'BF.B']]


print(type(my_tickers))
print(my_tickers)






In [0]:
#data cleaning: rename sector column 

SP500_df=SP500_df.withColumnRenamed('GICS?Sector', 'industry')

SP500_df.printSchema()
display(SP500_df)

In [0]:
import yfinance as yf
import pandas as pd
from functools import reduce

# # replace with the list of 500 tickers
tickers = ['AAPL', 'AMZN', 'GOOG', 'MSFT', 'TSLA', 'META', 'JPM', 'JNJ', 'V', 'PG', 'NVDA', 'HD', 'BAC', 'UNH', 'MA', 'JNJ', 'DIS', 'VZ', 'ADBE','PFE','HLT']  


# create a list of DataFrames for each ticker
dfs = [spark.createDataFrame(yf.download(
                        ticker,
                        period="10y",         # time period
                        interval="1d",       # trading interval
                        ignore_tz=True,      # ignore timezone when aligning data from different exchanges?
                        prepost=False).assign(ticker=ticker).reset_index())
       for ticker in tickers]

print(type(dfs))

# union all the DataFrames together
df_all = reduce(lambda df1, df2: df1.union(df2), dfs)

# show the final DataFrame
display(df_all)

# print schema
df_all.printSchema()




In [0]:

#data preprossing: move column positions, lowercase column names

# stock_df = (df_all.select('Date', 'ticker', *[c for c in df_all.columns [1:7]])
#             .toDF(*[c.lower() for c in df_all.columns])
# )
  

# stock_df.printSchema()

In [0]:
df_all2 = (df_all.withColumnRenamed('Adj close', 'Adj_close')
                 .select('Date','ticker','Close','Adj_close','Open', 'High', 'Low','Volume')
            
)

display(df_all2)

In [0]:
# QUESTION 1:
# revise timestamp format while keeping it as a timestamp type

from pyspark.sql.functions import date_format, to_date

df_all3 = (df_all2.withColumn("date", date_format("date", "yyyy-MM-dd"))
               .withColumn("date", to_date("date", "yyyy-MM-dd"))
            #    .withColumn('date', to_timestamp("date", "yyyy-MM-dd"))

)
        

df_all3.printSchema()
display(df_all3)

In [0]:
SP500_df.printSchema()

In [0]:
#join two dataframes to genarate a stock_df which adds an industry column to the df_all

from pyspark.sql.functions import broadcast

stock_df = (df_all3.join(broadcast(SP500_df).select('Symbol','industry'), df_all.ticker == SP500_df.Symbol)
                .drop('Symbol')           
)



# stock_df.printSchema()
display(stock_df)




In [0]:
# if you wanna see a particular stock's performance in a particular year 
from pyspark.sql.functions import col, year

tsla_df = (stock_df.select('*')
                   .where((col('ticker')=='TSLA') &(year(col('date')).isin('2020','2021','2022')))
                   )
display(tsla_df)




In [0]:
#calculate and rank average close price for each stock in each year 
from pyspark.sql.window import Window
from pyspark.sql.functions import *
window_spec = Window.partitionBy('year').orderBy(desc_nulls_last('avg_close'))

avg_close_df = (stock_df.withColumn('year', year('date'))
                            .groupBy('year','ticker','industry')
                            .agg(avg('close').alias('avg_close'))
                            .withColumn('rank', rank().over(window_spec))
                            .orderBy('year')
                   )

avg_close_df.printSchema()
display(avg_close_df)

In [0]:
#calculate market return in each year --adjust close price 

window_spec = Window.partitionBy("ticker").orderBy("year")



market_return_df = (avg_close_df
                    .withColumn('prev_year', lag('year').over(window_spec))
                    .select('year', 'ticker', (round((((col('avg_close') / lag('avg_close').over(window_spec)) - 1) * 100),2).alias('market_return')), 'industry')
                    .orderBy('year')
                   )

market_return_df.printSchema()
display(market_return_df)



In [0]:

# in stock market, the industry benchmark is 8% growth (this is the standard) 
# if a stock has 20%+ growth, this is hedge fund territory
#       >20% : 'Strong performance'
#       8% - 20% : 'Positive outlook' 
#       0-8% :'Mixed performance'
#       <0: Underperforming

from pyspark.sql.functions import col, when

performance_df = (market_return_df
                 .select('*')
                 .withColumn('performance',
                             when(col('market_return') >= '20', 'Strong performance')
                             .when(col('market_return').between('8', '20'), 'Positive performance')
                             .when(col('market_return').between('0', '8'), 'Mixed performance')
                             .when(col('market_return') <=0, 'Underperforming')
                             .otherwise('lack of info'))
                .filter(col('year') != '2013')
                .orderBy('year')
                )

display(performance_df)


In [0]:
#compare different industries' stock performance in each year


from pyspark.sql.functions import avg

all_industry_df= (performance_df.select('*')
                                    .groupBy('year', 'industry')
                                    .agg(avg('market_return').alias('avg_market_return'))
                                    .withColumn(' rank', rank().over(Window.partitionBy('year').orderBy(desc('avg_market_return'))))
                                               )


display(all_industry_df)





In [0]:
#compare different companies' performance in the same industry 

each_industry_df= (performance_df.select('*')
                                    .withColumn(' rank', rank().over(Window.partitionBy('year','industry').orderBy(desc('market_return'))))
                                               )

display(each_industry_df)

In [0]:
# List tables in mysql database

jdbc_url = 'jdbc:mysql://database.ascendingdc.com:3306/de_001'
user = 'sophiawu'
password = 'welcome'
jdbc_driver = "com.mysql.jdbc.Driver"

db_name = 'de_001'
table_list = (
    spark.read.format("jdbc")
    .option("driver", jdbc_driver)
    .option("url", jdbc_url)
    .option("dbtable", "information_schema.tables")
    .option("user", user)
    .option("password", password)
    .load()
    .filter(f"table_schema = '{db_name}'")
    .select("table_name")
)

table_list.show()



In [0]:
#read the covid table from mysql to databricks

covid_case_df = (
    spark.read.format("jdbc")
    .option("driver", jdbc_driver)
    .option("url", jdbc_url)
    .option("dbtable", "covidCase")
    .option("user", user)
    .option("password", password)
    .load()
)

display(covid_case_df)
covid_case_df.printSchema()

In [0]:
#change date format from 3/20/20 to 2023-03-20


covid_case_df = (covid_case_df.withColumn('date_of_interest', date_format('date_of_interest','yyyy-MM-dd'))
                             .withColumn('date_of_interest', to_date('date_of_interest','yyyy-MM-dd'))
)

                   
display(covid_case_df)
covid_case_df.printSchema()


In [0]:
#correlation between COVID-19 case numbers and stock price in 2020

# A correlation coefficient of 1 indicates a perfect positive correlation, 
# while a value of -1 indicates a perfect negative correlation, 
# and a value of 0 indicates no correlation.
# In general, correlation coefficients above 0.7 or below -0.7 are often considered strong correlations.


# Quarterly 2 report
from pyspark.sql.functions import col, corr

# Join the two dataframes on the date column
quarter2_df =(stock_df.select('date', 'ticker', 'Close', 'industry')
                        .where(col('date').between('2020-03-01','2020-05-31'))
                        .join(broadcast(covid_case_df).select('date_of_interest', 'CASE_COUNT'),col('date')==col('date_of_interest'))
                        .drop('date_of_interest')
                        )

from pyspark.sql.functions import corr

# Compute the correlation between close price and case number
corr_q2_df = (quarter2_df.groupBy("ticker")
                        .agg(corr('Close', 'CASE_COUNT').alias('correlation'))
                        .withColumn('window', lit('Q2 2020'))
)

display(corr_q2_df)

#join the corr_df with sp500_df to caculate the impact of covid cases on different industry 

industry_q2_df = (corr_q2_df
                            .join(SP500_df, corr_q2_df.ticker == SP500_df.Symbol)
                            .groupBy('window','industry')
                            .agg(avg(col('correlation')).alias('avg_corr'))
                            .orderBy('avg_corr')
           
                   )
 
display(industry_q2_df)



In [0]:

#quarter 3 report

quarter3_df =(stock_df.select('date', 'ticker', 'Close', 'industry')
                        .where(col('date').between('2020-06-01','2020-08-31'))
                        .join(broadcast(covid_case_df).select('date_of_interest', 'CASE_COUNT'),col('date')==col('date_of_interest'))
                        .drop('date_of_interest')
                        )

# Compute the correlation between close price and covid case number
corr_q3_df = (quarter3_df.groupBy("ticker")
                        .agg(corr('Close', 'CASE_COUNT').alias('correlation'))
                        .withColumn('window', lit('Q3 2020'))
)

display(corr_q3_df)

industry_q3_df = (corr_q3_df
                            .join(SP500_df, corr_q3_df.ticker == SP500_df.Symbol)
                            .groupBy('window','industry')
                            .agg(avg(col('correlation')).alias('avg_corr'))
                            .orderBy('avg_corr')
           
                   )
 
display(industry_q3_df)



In [0]:
#quarter 4 report

quarter4_df =(stock_df.select('date', 'ticker', 'Close', 'industry')
                        .where(col('date').between('2020-10-01','2020-12-31'))
                        .join(broadcast(covid_case_df).select('date_of_interest', 'CASE_COUNT'),col('date')==col('date_of_interest'))
                        .drop('date_of_interest')
                        )

# Compute the correlation between close price and covid case number
corr_q4_df = (quarter4_df.groupBy("ticker")
                        .agg(corr('Close', 'CASE_COUNT').alias('correlation'))
                        .withColumn('window', lit('Q4 2020'))
)

display(corr_q4_df)


industry_q4_df = (corr_q4_df
                            .join(SP500_df, corr_q4_df.ticker == SP500_df.Symbol)
                            .groupBy('window','industry')
                            .agg(avg(col('correlation')).alias('avg_corr'))
                            .orderBy('avg_corr')
           
                   )
 
display(industry_q4_df)



In [0]:
#calculate market return in each quarter in 2020 based on close price 


#step1 : calculate quarterly average in 2020

quarterly_avg_df = (stock_df.withColumn('quarter', quarter('date'))
                            .groupBy('quarter','ticker','industry')
                            .agg(avg('close').alias('quarterly_avg_close'))
                            .withColumn('rank', rank().over(window_spec))
                            .orderBy('quarter')
                   )

# quarterly_avg_df.printSchema()
# display(quarterly_avg_df)

# window_spec = Window.partitionBy("ticker").orderBy("quarter")


# market_return_df = (avg_close_df
#                     .withColumn('prev_year', lag('year').over(window_spec))
#                     .select('year', 'ticker', (round((((col('avg_close') / lag('avg_close').over(window_spec)) - 1) * 100),2).alias('market_return')), 'industry')
#                     .orderBy('year')
#                    )

# market_return_df.printSchema()
# display(market_return_df)