In [18]:
# To do
#Split into multiple scripts
# Apply robust enterprise level logging
# Apply unit testing and integration testing logic.
# Can I cache a particular table to speed up downstream processing?

In [1]:
#install polyonc client
#import relevant libraries or modules
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col, explode, from_unixtime, regexp_replace, when, unix_timestamp, date_format
from pyspark.sql.types import StructType, StructField, StringType,DoubleType,LongType, ArrayType
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import json
from logging_config  import setup_logging
from datetime import datetime
from dateutil.relativedelta import relativedelta

import os
print(os.listdir())

['.ipynb_checkpoints', 'analysis_layer.ipynb', 'app.log', 'config.py', 'logging_config.py', 'output_file.parquet', 'raw_layer.py', 'stocks.csv', 'stock_analysis_2023.ipynb', 'stock_names_to_analyse_parquet', '__pycache__']


In [22]:
#Create sparksession
spark = SparkSession.builder.master("local[*]").appName("stock_analysis_2023_analysis_layer"). \
        getOrCreate()

def read_csv_into_df(stock_list):
    #Read csv file
    df_stock_name = spark.read.csv(stock_list, header=True, inferSchema=True)
    
    df_stock_name.write.parquet("stock_names_to_analyse_parquet",mode="overwrite")
    df_read=spark.read.parquet("stock_names_to_analyse_parquet")
    df_read.printSchema()
    df_stock_name = df_stock_name.withColumn('symbol', 
                    when(col("symbol")==  'FB','META')
                .when(col("symbol")== 'ANTM','ELV')
                                  .otherwise(col('symbol'))
                                         )
    
    return df_stock_name

def return_stock_names(df_stock_name):
    #Get the list of ticker symbols from the dataframe.
    symbols_to_analyse = [row['symbol'] for row in df_stock_name.collect()]
    print(f"Number of tickers: {len(symbols_to_analyse)}")
    return symbols_to_analyse
    


df_stock_name = read_csv_into_df(stock_list="stocks.csv")
df_stock_name.show()
symbols_to_analyse=return_stock_names(df_stock_name)
print(symbols_to_analyse)

root
 |-- company_name: string (nullable = true)
 |-- symbol: string (nullable = true)

+--------------------+------+
|        company_name|symbol|
+--------------------+------+
|          Apple Inc.|  AAPL|
|Microsoft Corpora...|  MSFT|
|     Amazon.com Inc.|  AMZN|
|           Tesla Inc|  TSLA|
|Alphabet Inc. Cla...| GOOGL|
|Alphabet Inc. Cla...|  GOOG|
|Berkshire Hathawa...| BRK.B|
|   Johnson & Johnson|   JNJ|
|UnitedHealth Grou...|   UNH|
|  NVIDIA Corporation|  NVDA|
|Meta Platforms In...|  META|
|Procter & Gamble ...|    PG|
|JPMorgan Chase & Co.|   JPM|
|Exxon Mobil Corpo...|   XOM|
|   Visa Inc. Class A|     V|
|     Home Depot Inc.|    HD|
| Chevron Corporation|   CVX|
|Mastercard Incorp...|    MA|
|         AbbVie Inc.|  ABBV|
|         Pfizer Inc.|   PFE|
+--------------------+------+
only showing top 20 rows

Number of tickers: 100
['AAPL', 'MSFT', 'AMZN', 'TSLA', 'GOOGL', 'GOOG', 'BRK.B', 'JNJ', 'UNH', 'NVDA', 'META', 'PG', 'JPM', 'XOM', 'V', 'HD', 'CVX', 'MA', 'ABBV', 'P

In [20]:
def read_data(input_file="output_file.parquet"):
    df=spark.read.parquet(input_file)
    print("Schema before transformation:")
    df.printSchema()
    if "t" in df.columns:
        date_col = date_format(from_unixtime(col("t")/1000),"yyyy-MM-dd")
        df = df.withColumn("date",date_col)
        df=df.drop(col("t"))
    else:
        print("column 't' is not found")

    df = df.withColumnRenamed("c","close_price")
    return df
df = read_data()
df.select(col("ticker")).distinct().show()
df.select(col("date")).distinct().orderBy(col("date").asc()).show()

Schema before transformation:
root
 |-- ticker: string (nullable = true)
 |-- c: double (nullable = true)
 |-- t: long (nullable = true)

+------+
|ticker|
+------+
|   MMM|
|  CSCO|
|  QCOM|
|  META|
|   DIS|
|  TSLA|
|     T|
|    VZ|
|   AMT|
|   PFE|
|  GOOG|
|  NFLX|
| GOOGL|
|   CCI|
|   WMT|
|  AVGO|
|  AMAT|
|    GE|
|  ORCL|
|   LLY|
+------+
only showing top 20 rows

+----------+
|      date|
+----------+
|2024-01-02|
|2024-01-03|
|2024-01-04|
|2024-01-05|
|2024-01-08|
|2024-01-09|
|2024-01-10|
|2024-01-11|
|2024-01-12|
|2024-01-16|
|2024-01-17|
|2024-01-18|
|2024-01-19|
|2024-01-22|
|2024-01-23|
|2024-01-24|
|2024-01-25|
|2024-01-26|
|2024-01-29|
|2024-01-30|
+----------+
only showing top 20 rows



In [7]:

def calculate_price_difference( df, ticker_col='ticker', date_col='date',price_col='close_price'):
    #1) Repartition by ticker means grouping/aggregations can be done locally (per ticker). This reduces shuffling and maximises parrallelism.
    #data frame is partitioned and sorted for group operations
    #2) Order by is needed due to make fetching the first and last prices are correct and efficient. 
    #3) Round, filtering, percent calcs completed in lower number of transformtions. Increases readability.
    #4) Wrapping function allows reusuability, modularity, paramaterisation (for different datasets in other code bases), can write unit tests for small datasets
    #and see if it has expected results.

    df = df.repartition(ticker_col).orderBy(ticker_col,date_col)

    window_spec = Window.partitionBy("ticker").orderBy(date_col).rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
    
    df_diff=df.withColumn("price_first", F.first(price_col).over(window_spec)) \
                .withColumn("price_last", F.last(price_col).over(window_spec)) \
                .withColumn("date_first",F.first(date_col).over(window_spec)) \
                .withColumn("date_last",F.last(date_col).over(window_spec)) \
                .filter((F.col(date_col) == F.col("date_first")) | (F.col(date_col) == F.col("date_last")))
    
    
    df_diff=df_diff.withColumn("price_diff", F.round(F.col("price_last")-F.col("price_first"),2)) \
                .withColumn("price_diff_percent", F.round(col("price_diff")*100/col("price_first"),2))
    
    df_diff_percent=df_diff.select("ticker", "date_first","date_first", "price_first", "date_last","price_last","price_diff","price_diff_percent"). \
                    distinct(). \
                    orderBy(F.col("price_diff_percent").desc())
    
    return df_diff_percent

#####################################################

In [8]:
###############################
#QUESTION 1 ANSWER: Which stock has had the greatest relative increase in price in this period? 'NVDA'
###############################

def greatest_relative_increase(df_diff_percent):
    #Get the first row and stock name.
    stock_name = df_diff_percent.collect()[0]["ticker"]
    stock_name =df_stock_name.filter(col("symbol")==stock_name).show(truncate=False)
    return stock_name
    
df_diff_percent= calculate_price_difference(df)
df_diff_percent.show(truncate=False)
greatest_relative_increase(df_diff_percent)

+------+----------+----------+-----------+----------+----------+----------+------------------+
|ticker|date_first|date_first|price_first|date_last |price_last|price_diff|price_diff_percent|
+------+----------+----------+-----------+----------+----------+----------+------------------+
|NVDA  |2023-02-24|2023-02-24|23.286     |2023-12-29|49.522    |26.24     |112.69            |
|META  |2023-02-24|2023-02-24|170.39     |2023-12-29|353.96    |183.57    |107.74            |
|INTC  |2023-02-24|2023-02-24|25.14      |2023-12-29|50.25     |25.11     |99.88             |
|AVGO  |2023-02-24|2023-02-24|57.775     |2023-12-29|111.625   |53.85     |93.21             |
|AMD   |2023-02-24|2023-02-24|78.09      |2023-12-29|147.41    |69.32     |88.77             |
|ADBE  |2023-02-24|2023-02-24|320.54     |2023-12-29|596.6     |276.06    |86.12             |
|LLY   |2023-02-24|2023-02-24|321.64     |2023-12-29|582.92    |261.28    |81.23             |
|NOW   |2023-02-24|2023-02-24|425.59     |2023-12-

In [9]:
#Start Question 2

In [10]:
#Find how much this percentage increase would increase your intial portfolio
#At the end of the year.
def final_portfolio_value(df_diff_percent):
    #Find the average percentage increase over the group of stocks in 2023.
    df_diff_percent_average=df_diff_percent.agg(F.avg("price_diff_percent").alias("avg_percent_gain"))
    df_diff_percent_average=df_diff_percent_average.collect()[0]["avg_percent_gain"]
    
    initial_portfolio_value=1000000
    final_portfolio_value=initial_portfolio_value*(1+df_diff_percent_average/100)
    final_portfolio_value = round(final_portfolio_value,2)
    return final_portfolio_value
#QUESTION 2 ANSWER: If you had invested $1 million at the beginning of this period by purchasing $10,000 worth of shares in every company in the list equally, 
#how much would you have today? Technical note, you can assume that it is possible 
#to purchase fractional shares. Ans: $1682181.14
final_portfolio_value(df_diff_percent)

1173908.0

In [20]:
#Start Question 3
df = read_data()
df.select(col("date")).distinct().orderBy(col("date")).show()

Schema before transformation:
root
 |-- ticker: string (nullable = true)
 |-- c: double (nullable = true)
 |-- t: long (nullable = true)

+----------+
|      date|
+----------+
|2023-02-24|
|2023-02-27|
|2023-02-28|
|2023-03-01|
|2023-03-02|
|2023-03-03|
|2023-03-06|
|2023-03-07|
|2023-03-08|
|2023-03-09|
|2023-03-10|
|2023-03-13|
|2023-03-14|
|2023-03-15|
|2023-03-16|
|2023-03-17|
|2023-03-20|
|2023-03-21|
|2023-03-22|
|2023-03-23|
+----------+
only showing top 20 rows



In [14]:
def monthly_cagr(df, start_month, end_month):

    #Filter dataframe for data on these dates. Make 2 seperate dataframes for each of these dates. 
    df_start=df.filter(col("date")==start_month)
    df_start.show()
    df_start=df_start.withColumnRenamed("close_price","start_price")
    df_start=df_start.withColumnRenamed("date","start_month")
    if df_start.isEmpty():
        raise ValueError("df_start dataset is empty")

    #Shows the end date and end prince we are interested in. 
    df_end=df.filter(col("date")==end_month)
    df_end=df_end.withColumnRenamed("close_price","end_price")
    df_end=df_end.withColumnRenamed("date","end_month")
    if df_end.isEmpty():
        raise ValueError("df_end dataset is empty")

    #Join these dataframes on primary key ticker. 
    df_jan_jun=df_start.join(df_end,on="ticker")

    #For extensibilty for the month constant in below calc
    start_date=datetime.strptime(start_month,"%Y-%m-%d")
    end_date=datetime.strptime(end_month,"%Y-%m-%d")
    difference=relativedelta(end_date,start_date)
    difference_months=difference.years*12+difference.months

    #calculate the monthly Compounded annual growth rate.
    #Assumption of inclusive of January and June months
    end_val = col("end_price")
    start_val = col("start_price")
    months = difference_months+1
    
    calculation = ((end_val/start_val)**(1/months))-1
    df_jan_jun=df_jan_jun.withColumn("CAGR over defined period", calculation)

    #Sort datafram in "percent_gain" decending order
    df_jan_jun=df_jan_jun.sort(col("CAGR over defined period"), ascending=False)

    #Pick the top result
    stock_with_greatest_monthly_CAGR=df_jan_jun.collect()[0]["ticker"]
    stock_with_greatest_monthly_CAGR=df_stock_name.filter(col("symbol")==stock_with_greatest_monthly_CAGR)
    stock_with_greatest_monthly_CAGR.show(truncate=False)
    return print(stock_with_greatest_monthly_CAGR.collect()[0]["company_name"])
    
df = read_data()
df.show()
stock_with_greatest_monthly_CAGR=monthly_cagr(df, start_month="2023-01-03", end_month="2023-06-30")

Schema before transformation:
root
 |-- ticker: string (nullable = true)
 |-- c: double (nullable = true)
 |-- t: long (nullable = true)

+------+-----------+----------+
|ticker|close_price|      date|
+------+-----------+----------+
|    GE|    85.1875|2023-06-13|
|    GE|    83.5595|2023-06-14|
|    GE|    84.1261|2023-06-15|
|    GE|    84.8284|2023-06-16|
|    GE|    83.0168|2023-06-20|
|    GE|    83.8388|2023-06-21|
|    GE|    83.6552|2023-06-22|
|    GE|    82.8252|2023-06-23|
|    GE|    83.3599|2023-06-26|
|    GE|     83.735|2023-06-27|
|    GE|     85.435|2023-06-28|
|    GE|    85.9856|2023-06-29|
|    GE|    87.6696|2023-06-30|
|    GE|    86.4166|2023-07-03|
|    GE|    86.7119|2023-07-05|
|    GE|    85.6903|2023-07-06|
|    GE|    86.4086|2023-07-07|
|    GE|    88.2123|2023-07-10|
|    GE|     88.763|2023-07-11|
|    GE|     88.747|2023-07-12|
+------+-----------+----------+
only showing top 20 rows

+------+-----------+----+
|ticker|close_price|date|
+------+--------

ValueError: df_start dataset is empty

In [None]:
#Start Question 4
def greatest_decrease_in_price(df):

    df_week = df.withColumn("week_start",F.date_trunc("week",col("date")))
    #Window partition by ticker and week
    window_spec=Window.partitionBy("ticker", "week_start").orderBy("week_start")
    df_week=df_week.withColumn("start_price",F.first("close_price").over(window_spec)) \
                .withColumn("end_price",F.last("close_price").over(window_spec))
    df_week_final=df_week.select("ticker","week_start","start_price","end_price").distinct()
    df_week_final=df_week_final.withColumn("price_diff",F.round(F.col("end_price")-F.col("start_price"),2))
    df_week_final = df_week_final.sort(col("price_diff"),ascending=True)
    df_week_final.show()
    greatest_price_drop_stock = df_week_final.collect()[0]["ticker"]
    greatest_price_drop_week = df_week_final.collect()[0]["week_start"]
    print(greatest_price_drop_stock)
    print(greatest_price_drop_week)
    greatest_price_drop_stock_name= df_stock_name.filter(col("symbol")==greatest_price_drop_stock)
    greatest_price_drop_stock_name.show()
    return greatest_price_drop_stock_name.collect()[0]["company_name"]
greatest_decrease_in_price(df)

In [None]:
def greatest_percent_drop_stock(df):
    df_week = df.withColumn("week_start",F.date_trunc("week",col("date")))
    #Window partition by ticker and week
    window_spec=Window.partitionBy("ticker", "week_start").orderBy("week_start")
    df_week=df_week.withColumn("start_price",F.first("close_price").over(window_spec)) \
                  .withColumn("end_price",F.last("close_price").over(window_spec))
    df_week_final=df_week.select("ticker","week_start","start_price","end_price").distinct()
    df_week_final=df_week_final.withColumn("price_diff",F.round(F.col("end_price")-F.col("start_price"),2))
    calc= (col("price_diff"))*100/col("start_price")
    df_week_final_percent=df_week_final.withColumn("percent_drop",calc). \
                            sort(col("percent_drop"),ascending=True)
    
    
    greatest_percent_drop_stock = df_week_final_percent.collect()[0]["ticker"]    
    greatest_percent_drop_week =df_week_final_percent.collect()[0]["week_start"]
    
    print(greatest_percent_drop_week)
    
    greatest_percent_drop_stock_full_name = df_stock_name.filter(col("symbol")==greatest_percent_drop_stock)
    greatest_percent_drop_stock_full_name = greatest_percent_drop_stock_full_name.collect()[0]["company_name"]
    print(greatest_percent_drop_stock_full_name)
    return
    
greatest_percent_drop_stock(df)