# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


####  Run this cell to set up and start your interactive session.


In [1]:
%stop_session
%idle_timeout 15
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5

import sys
from pyspark.sql.functions import *
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job

import boto3 
from pyspark.sql.functions import monotonically_increasing_id 
from pyspark.sql.window import Window 
from pyspark.sql.functions import row_number
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.7 
There is no current session.
Current idle_timeout is None minutes.
idle_timeout has been set to 15 minutes.
Setting Glue version to: 4.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 5
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Idle Timeout: 15
Session ID: d7a738b9-7dc8-4e7e-b2e2-ea6ce7c2858d
Applying the following default arguments:
--glue_kernel_version 1.0.7
--enable-glue-datacatalog true
Waiting for session d7a738b9-7dc8-4e7e-b2e2-ea6ce7c2858d to get into ready status...
Session d7a738b9-7dc8-

## Handle Data Ranges file

In [2]:
s3 = boto3.client('s3')
# Define the bucket and prefix (folder path)
bucket_name = 'sp500-historical-analysis-project'
folder_path = 'SP500_date_ranges.csv'

# Create DynamicFrame from JSON files in S3 
dyf_original = glueContext.create_dynamic_frame.from_options( connection_type="s3", 
                                                    connection_options={"paths": [f"s3://{bucket_name}/{folder_path}"]}, 
                                                    format="csv",
                                                    format_options={"withHeader": True})




In [3]:
# Convert DynamicFrame to DataFrame 
df = dyf_original.toDF() 

df = df.withColumn("date_range_start", col("date_range_start").cast("date"))
df = df.withColumn("date_range_end", col("date_range_end").cast("date"))

# Add index column 
window = Window.orderBy(monotonically_increasing_id()) 
df = df.withColumn("index", row_number().over(window))

dyf_with_index = DynamicFrame.fromDF(df, glueContext, "dyf_with_index") 
dyf_with_index.printSchema()

root
|-- date_range_start: date
|-- date_range_end: date
|-- index: int



In [6]:
s3 = boto3.client('s3')
bucket_name = 'sp500-historical-analysis-project'
prefix = 'PARQUET_date_ranges/'
response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix) # List all objects in the specified prefix
delete_keys = [{'Key': obj['Key']} for obj in response.get('Contents', [])] # Collect all object keys to delete
# Delete all objects
if delete_keys:
    s3.delete_objects(Bucket=bucket_name, Delete={'Objects': delete_keys})
print(f"Deleted {len(delete_keys)} objects from {bucket_name}/{prefix}")

Deleted 0 objects from sp500-historical-analysis-project/PARQUET_date_ranges/


In [7]:
s3output = glueContext.getSink(
  path='s3://sp500-historical-analysis-project/PARQUET_date_ranges/',
  connection_type="s3",
  updateBehavior="UPDATE_IN_DATABASE",
  compression="snappy",
  enableUpdateCatalog=True,
  transformation_ctx="s3output",
)
s3output.setCatalogInfo(
  catalogDatabase="sp500_db", catalogTableName="PARQUET_date_ranges"
)
s3output.setFormat("glueparquet")
s3output.writeFrame(dyf_with_index)

<awsglue.dynamicframe.DynamicFrame object at 0x7fc7f0af4430>


## Handle Company Profiles

In [24]:
# Define S3 bucket and path 
bucket_name = 'sp500-historical-analysis-project' 
folder_path = 'local_data/company_profiles/' 

# Create DynamicFrame from JSON files in S3 
dyf_original = glueContext.create_dynamic_frame.from_options( connection_type="s3", 
                                                    connection_options={"paths": [f"s3://{bucket_name}/{folder_path}"], "recurse": True}, 
                                                    format="json")




In [26]:
spark_df = dyf_original.toDF() #convert Glue DF to PySpark DF

#get relevant data from file name
spark_df = spark_df.withColumn('S3_filename_path', input_file_name()) #get original file location for data
spark_df = spark_df.withColumn("filename", split(col('S3_filename_path'), "/").getItem(5))
spark_df = spark_df.withColumn("temp", split(col("filename"), ".json").getItem(0)) 
spark_df = spark_df.withColumn("index", split(col("temp"), "_").getItem(0)) 
spark_df = spark_df.withColumn("index", col("index").cast("int"))
spark_df = spark_df.drop("temp", "filename", "S3_filename_path")

spark_df = spark_df.orderBy("index") 
spark_df.show(2)

+------+--------------------+-----------+--------------------+--------+------------------+-----------------+------+-----+
|ticker|        company_name|is_delisted|         description|exchange|            sector|         industry|source|index|
+------+--------------------+-----------+--------------------+--------+------------------+-----------------+------+-----+
|  DELL|Dell Technologies...|      false|Dell Technologies...|    NYSE|        Technology|Computer Hardware|tiingo|    0|
|  ERIE|Erie Indemnity Co...|      false|Erie Indemnity Co...|  NASDAQ|Financial Services|Insurance Brokers|tiingo|    1|
+------+--------------------+-----------+--------------------+--------+------------------+-----------------+------+-----+
only showing top 2 rows


In [12]:
spark_df.count(), dyf_original.count() #should match

(1153, 1153)


In [125]:
from io import StringIO
from datetime import datetime
import pandas as pd
s3 = boto3.client('s3')
# Define the bucket name and the file key (path to your CSV file in the bucket)
bucket_name = 'sp500-historical-analysis-project'
file_key = 'cleaned_sp_500_dataset.csv'
response = s3.get_object(Bucket=bucket_name, Key=file_key) # Get the object from S3
csv_content = response['Body'].read().decode('utf-8') # Read the file content as a string
csv_buffer = StringIO(csv_content) # Use StringIO to convert the string data into a file-like object

df = pd.read_csv(csv_buffer).iloc[:1153]
df["id"] = [i for i in range(len(df))]
df["Removed_Date"] = df["Removed_Date"].fillna("September 30, 2024")
df = df.where(pd.notnull(df), None) #replace "NAN" values with NULL

df["Added_Date"] = df["Added_Date"].apply(lambda x: (datetime.strptime(x, "%B %d, %Y")).__str__()[:10])
df["Removed_Date"] = df["Removed_Date"].apply(lambda x: (datetime.strptime(x, "%B %d, %Y")).__str__()[:10])

df = df.drop(columns=['Removal_Reason', 'Replaces', 'Ticker', 'Name'])
df.head() # Display the DataFrame

   Added_Date Removed_Date  id
0  2024-09-23   2024-09-30   0
1  2024-09-23   2024-09-30   1
2  2024-09-23   2024-09-30   2
3  2024-07-05   2024-09-30   3
4  2024-06-24   2024-09-30   4


In [126]:
temp = spark.createDataFrame(df)
joined_df = spark_df.join(temp, spark_df["Index"] == temp["id"], "inner")

temp.count(), spark_df.count(), joined_df.count()

(1153, 1153, 1153)


In [127]:
joined_df = joined_df.withColumn("added_date", col("Added_Date").cast("date"))
joined_df = joined_df.withColumn("removal_date", col("Removed_Date").cast("date"))
joined_df = joined_df.withColumn("company_details", col("description").cast("string"))
joined_df = joined_df.drop('description','id','Name','Removed_Date') #Note: capitalization is not considered by spark
joined_df.printSchema()

root
 |-- ticker: string (nullable = true)
 |-- company_name: string (nullable = true)
 |-- is_delisted: boolean (nullable = true)
 |-- exchange: string (nullable = true)
 |-- sector: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- source: string (nullable = true)
 |-- index: integer (nullable = true)
 |-- added_date: date (nullable = true)
 |-- removal_date: date (nullable = true)
 |-- company_details: string (nullable = true)


In [128]:
joined_df = joined_df.orderBy("index") 
joined_df.show(2)

+------+--------------------+-----------+--------+------------------+-----------------+------+-----+----------+------------+--------------------+
|ticker|        company_name|is_delisted|exchange|            sector|         industry|source|index|added_date|removal_date|     company_details|
+------+--------------------+-----------+--------+------------------+-----------------+------+-----+----------+------------+--------------------+
|  DELL|Dell Technologies...|      false|    NYSE|        Technology|Computer Hardware|tiingo|    0|2024-09-23|  2024-09-30|Dell Technologies...|
|  ERIE|Erie Indemnity Co...|      false|  NASDAQ|Financial Services|Insurance Brokers|tiingo|    1|2024-09-23|  2024-09-30|Erie Indemnity Co...|
+------+--------------------+-----------+--------+------------------+-----------------+------+-----+----------+------------+--------------------+
only showing top 2 rows


In [133]:
dyf = DynamicFrame.fromDF(joined_df, glueContext, "dyf") # Convert Spark DataFrame to Glue DynamicFrame 
dyf.printSchema() # Show the schema of the DynamicFrame 

root
|-- ticker: string
|-- company_name: string
|-- is_delisted: boolean
|-- exchange: string
|-- sector: string
|-- industry: string
|-- source: string
|-- index: int
|-- added_date: date
|-- removal_date: date
|-- company_details: string


In [135]:
#Delete everything in output file first to prevent conflict errors
s3 = boto3.client('s3')
bucket_name = 'sp500-historical-analysis-project'
prefix = 'PARQUET_company_profiles/'
response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix) # List all objects in the specified prefix
delete_keys = [{'Key': obj['Key']} for obj in response.get('Contents', [])] # Collect all object keys to delete
# Delete all objects
if delete_keys:
    s3.delete_objects(Bucket=bucket_name, Delete={'Objects': delete_keys})
print(f"Deleted {len(delete_keys)} objects from {bucket_name}/{prefix}")

Deleted 0 objects from sp500-historical-analysis-project/PARQUET_company_profiles/


In [136]:
s3output = glueContext.getSink(
  path='s3://sp500-historical-analysis-project/PARQUET_company_profiles/',
  connection_type="s3",
  updateBehavior="UPDATE_IN_DATABASE",
  compression="snappy",
  enableUpdateCatalog=True,
  transformation_ctx="s3output",
)
s3output.setCatalogInfo(
  catalogDatabase="sp500_db", catalogTableName="PARQUET_company_profiles"
)
s3output.setFormat("glueparquet")
s3output.writeFrame(dyf)

<awsglue.dynamicframe.DynamicFrame object at 0x7f78b7c20bb0>


## Handle Market Cap Metadata

In [51]:
# Define S3 bucket and path 
bucket_name = 'sp500-historical-analysis-project' 
folder_path = 'local_data/market_cap_metadata//' 

# Create DynamicFrame from JSON files in S3 
dyf_original = glueContext.create_dynamic_frame.from_options( connection_type="s3", 
                                                    connection_options={"paths": [f"s3://{bucket_name}/{folder_path}"], "recurse": True}, 
                                                    format="json")




In [52]:
df = dyf_original.toDF() #convert Glue DF to PySpark DF
df = df.orderBy("index") 
df.show(2)

+-----+------+------+------------------------+-----------------------+----------------+----------+-----------------------+--------+----------------------+-----------------------------+
|index|ticker|source|first_day_have_vs_needed|last_day_have_vs_needed|num_of_days_data|image_type|missing_num_days_before|is_empty|missing_num_days_after|num_trading_days_to_calculate|
+-----+------+------+------------------------+-----------------------+----------------+----------+-----------------------+--------+----------------------+-----------------------------+
|    0|  DELL|tiingo|    2024-09-23 : 2024...|   2024-09-30 : 2024...|               6|      null|                   null|    null|                  null|                         null|
|    1|  ERIE|tiingo|    2024-09-23 : 2024...|   2024-09-30 : 2024...|               6|      null|                   null|    null|                  null|                         null|
+-----+------+------+------------------------+-----------------------+-----

In [53]:
df.count(), dyf_original.count() #should match

(1153, 1153)


In [54]:
dyf = DynamicFrame.fromDF(df, glueContext, "dyf") # Convert Spark DataFrame to Glue DynamicFrame 
dyf.printSchema() # Show the schema of the DynamicFrame 

root
|-- index: int
|-- ticker: string
|-- source: string
|-- first_day_have_vs_needed: string
|-- last_day_have_vs_needed: string
|-- num_of_days_data: int
|-- image_type: string
|-- missing_num_days_before: int
|-- is_empty: boolean
|-- missing_num_days_after: int
|-- num_trading_days_to_calculate: int


In [56]:
#Delete everything in output file first to prevent conflict errors
import boto3 

s3 = boto3.client('s3')
bucket_name = 'sp500-historical-analysis-project'
prefix = 'PARQUET_market_cap_metadata/'
response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix) # List all objects in the specified prefix
delete_keys = [{'Key': obj['Key']} for obj in response.get('Contents', [])] # Collect all object keys to delete
# Delete all objects
if delete_keys:
    s3.delete_objects(Bucket=bucket_name, Delete={'Objects': delete_keys})
print(f"Deleted {len(delete_keys)} objects from {bucket_name}/{prefix}")

Deleted 0 objects from sp500-historical-analysis-project/PARQUET_market_cap_metadata/


In [57]:
s3output = glueContext.getSink(
  path='s3://sp500-historical-analysis-project/PARQUET_market_cap_metadata/',
  connection_type="s3",
  updateBehavior="UPDATE_IN_DATABASE",
  compression="snappy",
  enableUpdateCatalog=True,
  transformation_ctx="s3output",
)
s3output.setCatalogInfo(
  catalogDatabase="sp500_db", catalogTableName="PARQUET_market_cap_metadata"
)
s3output.setFormat("glueparquet")
s3output.writeFrame(dyf)

<awsglue.dynamicframe.DynamicFrame object at 0x7f7ad2a9ccd0>


## Handle Market Cap Data

In [32]:
#using glueContext.create_dynamic_frame.from_options() loses json data (3310722 vs 3311837 expected)
df = spark.read.option("multiline", "true").json("s3://sp500-historical-analysis-project/local_data/company_market_cap_data/")




In [33]:
#get relevant data from file name
df = df.withColumn('S3_filename_path', input_file_name()) #get original file location for data
df = df.withColumn("filename", split(col('S3_filename_path'), "/").getItem(5))
df = df.withColumn("temp", split(col("filename"), ".json").getItem(0)) 
df = df.withColumn("index", split(col("temp"), "_").getItem(0)) 
df = df.withColumn("index", col("index").cast("int"))
df = df.withColumn("ticker", split(col("temp"), "_").getItem(1))
df = df.drop("temp", "filename", "S3_filename_path")

#handle dates
df = df.withColumn("date", to_date(df["date"], "yyyy-MM-dd")) # Convert to date type 
# Extract year, month, and day 
df = df.withColumn("year", year(df["date"])) 
df = df.withColumn("month", month(df["date"])) 
df = df.withColumn("day", dayofmonth(df["date"]))

#sort the spark df by date; this will cut writing time by over 90%!!!
df = df.orderBy("year", "month", "day", "index") 
df.show(5)

+----------+---------------+-----+------+----+-----+---+
|      date|     market_cap|index|ticker|year|month|day|
+----------+---------------+-----+------+----+-----+---+
|1998-01-02|7.22524517309E9|  117|   BKR|1998|    1|  2|
|1998-01-02|  7.897793176E9|  133|     D|1998|    1|  2|
|1998-01-02|3.46196694369E9|  191|   HUM|1998|    1|  2|
|1998-01-02|3.08283962778E9|  208|   FCX|1998|    1|  2|
|1998-01-02|6.48851865473E9|  284|   TMO|1998|    1|  2|
+----------+---------------+-----+------+----+-----+---+
only showing top 5 rows


In [34]:
df.where(df["index"] == 0).show() #needs 6 rows

+----------+-----------------+-----+------+----+-----+---+
|      date|       market_cap|index|ticker|year|month|day|
+----------+-----------------+-----+------+----+-----+---+
|2024-09-23|8.339288691681E10|    0|  DELL|2024|    9| 23|
|2024-09-24|8.320846784223E10|    0|  DELL|2024|    9| 24|
|2024-09-25|8.523707766261E10|    0|  DELL|2024|    9| 25|
|2024-09-26|8.971987978317E10|    0|  DELL|2024|    9| 26|
|2024-09-27|8.527254286926E10|    0|  DELL|2024|    9| 27|
|2024-09-30|8.408091192582E10|    0|  DELL|2024|    9| 30|
+----------+-----------------+-----+------+----+-----+---+


In [36]:
#should match 
#expected number based on local machine testing: 3311837
df.count()

3311837


In [37]:
dyf = DynamicFrame.fromDF(df, glueContext, "dyf") # Convert Spark DataFrame to Glue DynamicFrame 
dyf.printSchema() # Show the schema of the DynamicFrame 

root
|-- date: date
|-- market_cap: double
|-- index: int
|-- ticker: string
|-- year: int
|-- month: int
|-- day: int


In [6]:
# df_sample = df.sample(0.1)
# print(df_sample.count())
# dyf = DynamicFrame.fromDF(df_sample, glueContext, "dyf") # Convert Spark DataFrame to Glue DynamicFrame 
# dyf.printSchema() # Show the schema of the DynamicFrame 

In [38]:
#Delete everything in output file first to prevent conflict errors
import boto3 

s3 = boto3.client('s3')
# Define the bucket and prefix (folder path)
bucket_name = 'sp500-historical-analysis-project'
prefix = 'PARQUET_company_market_cap_data/'


response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix) # List all objects in the specified prefix
delete_keys = [{'Key': obj['Key']} for obj in response.get('Contents', [])] # Collect all object keys to delete
# Delete all objects
if delete_keys:
    s3.delete_objects(Bucket=bucket_name, Delete={'Objects': delete_keys})

print(f"Deleted {len(delete_keys)} objects from {bucket_name}/{prefix}")

Deleted 356 objects from sp500-historical-analysis-project/PARQUET_company_market_cap_data/


In [39]:
s3output = glueContext.getSink(
  path='s3://sp500-historical-analysis-project/PARQUET_company_market_cap_data/',
  connection_type="s3",
  updateBehavior="UPDATE_IN_DATABASE",
  partitionKeys=["year", "month"],
  compression="snappy",
  enableUpdateCatalog=True,
  transformation_ctx="s3output",
)
s3output.setCatalogInfo(
  catalogDatabase="sp500_db", catalogTableName="PARQUET_marketcaps"
)
s3output.setFormat("glueparquet")
s3output.writeFrame(dyf)

<awsglue.dynamicframe.DynamicFrame object at 0x7f92772fe170>


In [8]:
%stop_session

Stopping session: 95da716c-c609-4830-b67c-2384eff6d6e3
Stopped session.


### Times for Converting Marketcap Data and Writing As Parquet files to S3

#### Times with 10 workers without sorting
* 300 rows     : 11 seconds 
* 3000 rows    : 27 seconds 
* 30000 rows   : 185 seconds (3 minutes)
* 300000 rows  : 20+ minutes

#### Times with 30 workers without sorting
* 3000 rows    : 18 seconds 
* 30000 rows   : 154 seconds (3 minutes)
* 300000 rows  : 1174 seconds (20 minutes)

#### Times with 10 workers and Pre-Sorting of Spark DF by partitions (df.orderby)
* 3000000 rows : 16 seconds

#### Times with 30 workers and Pre-Sorting of Spark DF by partitions (df.orderby)
* 300000 rows  : 34 seconds
* 3000000 rows : 37 seconds