# workshopGlueNotebookMarkDown
================

## Metadata
--------

* **Date Created:** 2024-03-19
* **Author:** Bryant Pollard, II
* **Environment:** AWS Glue using PySpark
* **Purpose:** Glue Script to Extract Stock Financial Data From DynamoDB, Transform the Data, and Load into S3 public (read only) buckets
* **Dependencies:**
  * Required Libraries:
    + PySpark
    + AWSGlue
    + Boto3
    + BotoCore
    + PyArrow
  * AWS Services:
    + DynamoDB
    + S3
    + IAM Roles with appropriate permissions

## Table of Contents
---------------

1. [Setup and Initialization](#setup-and-initialization)
2. [Data Extraction](#data-extraction)
3. [Data Transformation](#data-transformation)
4. [Data Loading](#data-loading)


## Setup and Initialization
-------------------------

### Environment Configuration

This notebook uses AWS Glue Studio with PySpark 3.0 for processing financial data. The environment is automatically configured with the necessary dependencies and permissions.

### Required Permissions

The notebook requires:
- Read access to DynamoDB tables containing financial data
- Write access to target S3 buckets


In [2]:
# Configuration settings
%idle_timeout 180
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5

# Standard library imports
from io import StringIO, BytesIO
from decimal import Decimal

# Third-party library imports
import boto3
from botocore.exceptions import ClientError
from boto3.dynamodb.conditions import Key

# PySpark imports
from pyspark.sql.functions import col, coalesce, nanvl, lit
from pyspark.sql.types import IntegerType
import pyspark.sql.functions as F
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, length, coalesce, regexp_replace, when, udf
)
from pyspark.sql.types import FloatType, StringType

# AWSGlue imports
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
from awsglue.utils import getResolvedOptions

You are already connected to a glueetl session 5c6d933d-712e-40a2-856f-9ced0da58b64.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Current idle_timeout is None minutes.
idle_timeout has been set to 180 minutes.


You are already connected to a glueetl session 5c6d933d-712e-40a2-856f-9ced0da58b64.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Setting Glue version to: 4.0


You are already connected to a glueetl session 5c6d933d-712e-40a2-856f-9ced0da58b64.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous worker type: None
Setting new worker type to: G.1X


You are already connected to a glueetl session 5c6d933d-712e-40a2-856f-9ced0da58b64.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous number of workers: None
Setting new number of workers to: 5



In [3]:
#This is what runs the session.
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)




## Data Extraction
--------------

### Source System Details

* DynamoDB Table Name: TABLE5
* Key: PK and SK using a Single Table Design
* Attributes:
  + PK (string: Stock ticker symbol
  + SK (number): Year
  + many others...

In [4]:
# Creates a dynamic frame for ETL operation on the data. 
# The data source is a DynamoDB table. Then specify the table name, read throughput percentage, and set the split count.



source_dynamic_frame = glueContext.create_dynamic_frame.from_options(
    connection_type="dynamodb",
    connection_options={
        "dynamodb.input.tableName": "TABLE5",
        "dynamodb.throughput.read.percent": "1.0",
        "dynamodb.splits": "15"
    }
)





## Data Transformation
-------------------

### Transformation Rules

1. Data Standardization
   - Rename PK to stock_symbol and SK to year
   - Preprends 20 to ensure that 2 digit years become 4 digit years in the year column
   - Remove any duplicate rows
   - Removes % from YEARLY PRICE VAR columns 
   - Combine all YEARLY PRICE VAR columns into price_var
   - Convert the numbers with paranthesis () to negative values in the column Book Value per Share Growth

In [5]:

#Convert the DynamicFrame to a Spark DataFrame
df = source_dynamic_frame.toDF()

# # Rename columns
df = df.withColumnRenamed("PK", "stock_symbol")
df = df.withColumnRenamed("SK", "year")





In [6]:
#Since there is no 1900's data we can safley make this transformation.

#It essentially contcatinates "20" infront of the stirng in year, but only if the length of the string is 2.
# Otherwise leave it as is.

df = df.withColumn(
    "year",
    F.when(F.length(F.col("year")) == 2, F.concat(F.lit("20"), F.col("year")))
     .otherwise(F.col("year"))
)





In [7]:
#Remove all duplicate rows
df.dropDuplicates()

DataFrame[Total current assets: string, Earnings Before Tax Margin: string, Weighted Average Shs Out (Dil): string, 2017 PRICE VAR [%]: string, Gross Profit: string, Sector: string, Net Income: string, Total assets: string, Other Assets: string, Net Debt: string, Financing Cash Flow: string, stock_symbol: string, 2019 PRICE VAR [%]: string, Net cash flow / Change in cash: string, Total debt: string, Book Value per Share Growth: string, Profit Margin: string, Interest Expense: string, Consolidated Income: string, Total non-current liabilities: string, EBITDA: string, Total current liabilities: string, Operating Cash Flow: string, Tax Liabilities: string, Long-term investments: string, Net Profit Margin: string, Revenue: string, Weighted Average Shs Out: string, EBIT Margin: string, Total shareholders equity: string, Investment purchases and sales: string, Stock-based compensation: string, Net Income Com: string, EPS: string, Total liabilities: string, Free Cash Flow: string, EBITDA Marg

In [8]:
# List of columns to modify
columns_to_modify = [
    "2015 PRICE VAR [%]",
    "2016 PRICE VAR [%]",
    "2017 PRICE VAR [%]",
    "2018 PRICE VAR [%]",
    "2019 PRICE VAR [%]"
]

# Loop through the columns_to_modify and remove the % sign.
for column in columns_to_modify:
    df = df.withColumn(column, regexp_replace(col(column), "%", ""))




In [9]:
#To avoid complexity in this demo, there is no need for null handling. 

# List of columns to manipulate

price_var_columns = [
    "2015 PRICE VAR [%]",
    "2016 PRICE VAR [%]",
    "2017 PRICE VAR [%]",
    "2018 PRICE VAR [%]",
    "2019 PRICE VAR [%]"
]

# Convert columns to double (nullifying non-convertible values)
for column in price_var_columns:
    df = df.withColumn(column, col(column).cast("double"))

# Use nanvl to replace NaN with null, and coalesce to get the first non-null value
df = df.withColumn(
    "price_var", 
    coalesce(*[nanvl(col(c), lit(None)) for c in price_var_columns])
)

# Drop the original price variance columns

# Show only the price_var column
df.select("price_var").show()



+-------------------+
|          price_var|
+-------------------+
| -4.950494862528325|
| 23.927435714883966|
| -32.84550178011025|
|  -4.43052670613731|
|-22.142853432772046|
|  60.91953660745985|
|  53.53938838025913|
| -27.52227994058048|
|  80.34036321109737|
| -66.61308983517486|
| -1.401875544864496|
| 24.988463274349076|
|  6.998880080021027|
|  31.91400622758113|
| -38.73831437694461|
| 11.438928709783278|
|  29.33753331091046|
| -62.33296293845746|
| 121.29669023955697|
| -40.05208028738028|
+-------------------+
only showing top 20 rows


In [10]:
df = df.drop(*price_var_columns)





In [11]:

#Column to modify
column_name = "Book Value per Share Growth"

# Remove parentheses and convert to negative float. This needs to be a float to multiply by -1.

df = df.withColumn(
    column_name,
    when(
        col(column_name).rlike(r"^\(.*\)$"),  # Check if value is enclosed in parentheses
        regexp_replace(col(column_name), r"[()]", "").cast("float") * -1 
    ).otherwise(
        col(column_name).cast("float")  # Convert to float for non-parenthesized values
    )
)








In [12]:
#convert to a dynamic frame.
dynamic_frame = DynamicFrame.fromDF(df, glueContext, "dynamic_frame")




## Data Loading
-------------

### Target System Details

* S3 Bucket: s3://combined-stock-financial-datac/
* Format: CSV
* Partition Keys:
  + out/year_<year>_data (string): Stock ticker symbol

In [13]:
def write_filtered_year(dynamic_frame, year):
    # Convert 'year' to string type (if needed)
    # Convert to data frame.
    df = dynamic_frame.toDF().withColumn("year", col("year").cast("string"))
    dynamic_frame = DynamicFrame.fromDF(df, glueContext, "dynamic_frame")

    # Filter data to only contain where the year is the specified year.
    filtered_dynamic_frame = dynamic_frame.filter(lambda x: x['year'] == str(year))

    # If there is no data found in the year do nothing.
    if filtered_dynamic_frame.count() == 0:
        print(f"No data found for year {year}, skipping write.")
        return  

    # Repartition the dataframe by converting to dataframe.This will ensure only one file is writen to S3. 
    df_filtered = filtered_dynamic_frame.toDF().repartition(1)
    filtered_dynamic_frame = DynamicFrame.fromDF(df_filtered, glueContext, "filtered_dynamic_frame")
    
    #Generate a unique file name based on the year input
    
    output_file_name = f"year_{year}_data"
    full_s3_path = f"s3://combined-stock-financial-data/{output_file_name}"
    
    # Write to S3
    glueContext.write_dynamic_frame.from_options(
        frame=filtered_dynamic_frame,
        connection_type="s3",
        connection_options={"path": full_s3_path},
        format="csv",
        format_options={"header": "true"}
    )
years = (2014, 2015, 2016, 2017, 2018)
# Process all years
for year in years:
    write_filtered_year(dynamic_frame, year)






In [14]:
def rename_single_s3_file(bucket_name, folder_prefix, target_key):
    try:
        # Initialize S3 client
        s3_client = boto3.client('s3')
        
        # List all objects in the bucket, including the folder
        response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=folder_prefix)
        
        # Check if there are any objects in the bucket
        if 'Contents' not in response:
            raise ValueError("No files found in the bucket")
        
        # Check if there's exactly one object
        if len(response['Contents']) != 1:
            raise ValueError("Expected exactly one file, but found {} files".format(len(response['Contents'])))
        
        # Get the name of the single file
        source_key = response['Contents'][0]['Key']
        
        print(f"Found single file: {source_key}")
        
        # Rename the file
        copy_source = {'Bucket': bucket_name, 'Key': source_key}
        s3_client.copy_object(CopySource=copy_source, Bucket=bucket_name, Key=target_key)
        
        # Delete the original file
        s3_client.delete_object(Bucket=bucket_name, Key=source_key)
        
        print(f"Successfully renamed {source_key} to {target_key}")
    
    except ClientError as e:
        print(f"An error occurred: {e}")
    except ValueError as ve:
        print(ve)




In [15]:
# Years to process
years = (2014, 2015, 2016, 2017, 2018)

# Configuration for each year
year_configurations = {
    #Details for each year 
    year: {
        'bucket_name': 'combined-stock-financial-data',
        'folder_prefix': f'year_{year}_data/',
        'target_key': f'out/{year}_Financial_Data_DeDup.csv'
    } for year in years
}
def process_year(year):
    # get the year configurations for the year. 
    config = year_configurations[year]
    
    
    
    # Rename single S3 file
    rename_single_s3_file(config['bucket_name'], config['folder_prefix'], config['target_key'])

# Process all years
for year in years:
    process_year(year)

Found single file: year_2014_data/run-1742419372769-part-r-00000
Successfully renamed year_2014_data/run-1742419372769-part-r-00000 to out/2014_Financial_Data_DeDup.csv
Found single file: year_2015_data/run-1742419395243-part-r-00000
Successfully renamed year_2015_data/run-1742419395243-part-r-00000 to out/2015_Financial_Data_DeDup.csv
Found single file: year_2016_data/run-1742419411811-part-r-00000
Successfully renamed year_2016_data/run-1742419411811-part-r-00000 to out/2016_Financial_Data_DeDup.csv
Found single file: year_2017_data/run-1742419426292-part-r-00000
Successfully renamed year_2017_data/run-1742419426292-part-r-00000 to out/2017_Financial_Data_DeDup.csv
Found single file: year_2018_data/run-1742419441446-part-r-00000
Successfully renamed year_2018_data/run-1742419441446-part-r-00000 to out/2018_Financial_Data_DeDup.csv


In [16]:
job.commit()



