# AWS Glue Job - Load Commodities data to Bronze Stage from API source

## Set the Glue session parameters

In [8]:
%iam_role arn:aws:iam::212430227630:role/LabRole
%region us-east-1
%number_of_workers 2

%idle_timeout 30
%glue_version 4.0
%worker_type G.1X

%%configure 
{
  "--enable-metrics": "true",
  "--enable-observability-metrics": "true"
}

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.5 
Current iam_role is arn:aws:iam::212430227630:role/LabRole
iam_role has been set to arn:aws:iam::212430227630:role/LabRole.
Previous region: us-east-1
Setting new region to: us-east-1
Region is set to: us-east-1
Previous number of workers: None
Setting new number of workers to: 2
Current idle_timeout is None minutes.
idle_timeout has been set to 30 minutes.
Setting Glue version to: 4.0
Previous worker type: None
Setting new worker type to: G.1X
The following configurations have been updated: {'--enable-metrics': 'true', '--enable-observability-metrics': 'true'}


##  Set up and start your interactive session.


In [11]:
%load_ext autoreload
%autoreload 2

In [1]:
import sys
import boto3

from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 2
Idle Timeout: 30
Session ID: a0e20fd9-377c-416f-8ab0-d9c088dc689b
Applying the following default arguments:
--glue_kernel_version 1.0.5
--enable-glue-datacatalog true
--enable-metrics true
--enable-observability-metrics true
Waiting for session a0e20fd9-377c-416f-8ab0-d9c088dc689b to get into ready status...
Session a0e20fd9-377c-416f-8ab0-d9c088dc689b has been created.



## Transform Bronze data to Silver - FOREX


In [2]:
from datetime import datetime, timedelta, timezone
from functools import reduce

import pyspark.sql.functions as F




### Set AWS Storage parameters


In [3]:
BUCKET_NAME = "cryptoengineer"
PREFIX_BRONZE = "datalake/bronze/forex"




### Load job parameters

In [4]:
glue_client = boto3.client("glue")
# Check if params comde from a GLUE workflow
if '--WORKFLOW_NAME' in sys.argv and '--WORKFLOW_RUN_ID' in sys.argv:
    print("Running in Glue Workflow")
    
    glue_args = getResolvedOptions(
        sys.argv, ['WORKFLOW_NAME', 'WORKFLOW_RUN_ID']
    )
    
    print("Reading the workflow parameters")
    workflow_args = glue_client.get_workflow_run_properties(
        Name=glue_args['WORKFLOW_NAME'], RunId=glue_args['WORKFLOW_RUN_ID']
    )["RunProperties"]

    
    time_frame = int(workflow_args['time_frame'])
    symbols = workflow_args['symbols']

else:
    # Check if params comde from a Glue Job
    try:
        args = getResolvedOptions(sys.argv,
                                  ['JOB_NAME',
                                   'time_frame',
                                   'symbols'
                                   ])

        time_frame = int(args['time_frame'])
        symbols = args['symbols']
        print("Running as Job")
    except:
        print("Running as an interactive session")
        time_frame = 384
        symbols = "USDEUR"


Running as an interactive session


In [5]:
print("Time Frame: ", time_frame)
print("Symbols: ", symbols)

Time Frame:  384
Symbols:  USDEUR


#### Set the start and end dates for the data you want to load

In [6]:
# Start date
start_date = (datetime.utcnow() - timedelta(hours=time_frame)).strftime("%Y-%m-%d")
end_date = datetime.utcnow().strftime("%Y-%m-%d")

print("Start date; ",start_date," End date: ",end_date)

Start date;  2024-09-07  End date:  2024-09-23


## Read max date loaded to Silver from the INFO table

In [7]:
PREFIX_INFO_TABLE='datalake/gold/forex'
path=f"s3://{BUCKET_NAME}/{PREFIX_INFO_TABLE}"
print("Path:", path)

Path: s3://cryptoengineer/datalake/gold/forex


In [8]:
df_max_dates_silver = (
    spark
    .read
    .parquet(path)
    .filter(F.col("STAGE")=='silver')
    .select('symbol','base_currency','frequencies','end_datetime')
    .toPandas()
)

  series = series.astype(t, copy=False)


In [9]:
df_max_dates_silver.head(10)

   symbol base_currency frequencies        end_datetime
0  USDJPY           USD       15min 2024-09-06 16:45:00
1  USDEUR           USD       15min 2024-09-06 16:45:00
2  USDGBP           USD       15min 2024-09-06 16:45:00
3  USDCHF           USD       15min 2024-09-06 16:45:00


In [10]:
df_max_dates_silver.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4 entries, 0 to 3
Data columns (total 4 columns):
 #   Column         Non-Null Count  Dtype         
---  ------         --------------  -----         
 0   symbol         4 non-null      object        
 1   base_currency  4 non-null      object        
 2   frequencies    4 non-null      object        
 3   end_datetime   4 non-null      datetime64[ns]
dtypes: datetime64[ns](1), object(3)
memory usage: 256.0+ bytes


## Load the Bronze/Raw data for the time frame and symbol

In [11]:
path=f"s3://{BUCKET_NAME}/{PREFIX_BRONZE}"
print("Path:",path)

Path: s3://cryptoengineer/datalake/bronze/forex


In [12]:
df= (
    spark
    .read
    .parquet(path)
    .filter(F.col("load_date").between(start_date, end_date))
)




In [13]:
print("Records: ", df.count())

Records:  1269


In [14]:
df.show(5)

+-------------------+-------+------+-------+-------+------+----+-----+---+--------+----------+-------------+------+---------+------+--------------------+-----+----------+
|           datetime|   open|   low|   high|  close|volume|year|month|day|    time|      date|base_currency|source|frequency|symbol|          audit_time| type| load_date|
+-------------------+-------+------+-------+-------+------+----+-----+---+--------+----------+-------------+------+---------+------+--------------------+-----+----------+
|2024-09-23 05:00:00|0.90147|0.9009|0.90149|0.90124|   334|2024|   09| 23|05:00:00|2024-09-23|          USD|   FMP|    15min|USDEUR|2024-09-23 09:06:...|FOREX|2024-09-23|
|2024-09-23 04:45:00|0.90115|0.9008| 0.9015|0.90149|  1130|2024|   09| 23|04:45:00|2024-09-23|          USD|   FMP|    15min|USDEUR|2024-09-23 09:06:...|FOREX|2024-09-23|
|2024-09-23 04:30:00|0.90157|0.9011|0.90199|0.90119|  1461|2024|   09| 23|04:30:00|2024-09-23|          USD|   FMP|    15min|USDEUR|2024-09-23 09

In [15]:
df.printSchema()

root
 |-- datetime: string (nullable = true)
 |-- open: double (nullable = true)
 |-- low: double (nullable = true)
 |-- high: double (nullable = true)
 |-- close: double (nullable = true)
 |-- volume: long (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day: string (nullable = true)
 |-- time: string (nullable = true)
 |-- date: string (nullable = true)
 |-- base_currency: string (nullable = true)
 |-- source: string (nullable = true)
 |-- frequency: string (nullable = true)
 |-- symbol: string (nullable = true)
 |-- audit_time: timestamp (nullable = true)
 |-- type: string (nullable = true)
 |-- load_date: date (nullable = true)


Get the symbols, base and freq to load

In [16]:
## Get symbol, base and freq to read
df_symbols =(
    df
    .groupBy('symbol','base_currency','frequency')
    .count()
    .toPandas()
)




In [17]:
df_symbols.head(10)

   symbol base_currency frequency  count
0  USDEUR           USD     15min   1065
1  USDJPY           USD     15min     68
2  USDGBP           USD     15min     68
3  USDCHF           USD     15min     68


## Extract only data not loaded into Silver

In [18]:
df_list=[]
for ind in df_symbols.index:
    print("Symbol: ", df_symbols['symbol'][ind], " Base: ", df_symbols['base_currency'][ind], " Frequency: ", df_symbols['frequency'][ind])
    # Get max date loaded to silver
    max_datetime=df_max_dates_silver[(df_max_dates_silver['symbol'] ==df_symbols['symbol'][ind])
                              & (df_max_dates_silver['base_currency'] ==df_symbols['base_currency'][ind])
                              & (df_max_dates_silver['frequencies'] ==df_symbols['frequency'][ind])]['end_datetime'].max()
    # Select the bronze data loaded to bronze where datetime > max datetime
    df_new_data= (
        df
        .filter(F.col('symbol')==df_symbols['symbol'][ind])
        .filter(F.col('base_currency')==df_symbols['base_currency'][ind])
        .filter(F.col('frequency')==df_symbols['frequency'][ind])
        .filter(F.col('datetime')>max_datetime)
    )
    print("Records: ", df_new_data.count())
    df_list.append(df_new_data)

Symbol:  USDEUR  Base:  USD  Frequency:  15min
Records:  997
Symbol:  USDJPY  Base:  USD  Frequency:  15min
Records:  0
Symbol:  USDGBP  Base:  USD  Frequency:  15min
Records:  0
Symbol:  USDCHF  Base:  USD  Frequency:  15min
Records:  0


In [19]:
# Reduce and union al dataframes
df = reduce(lambda x, y: x.union(y), df_list)




In [20]:
df.count()

997


## Remove and filter Rows

In [None]:
# To check duplicates
"""
(
    df
    .groupBy('symbol','year','datetime','source','frequency','type')
    .count().alias('count')
    #.filter(F.col('count') > 1)
    .orderBy(F.col('datetime'), ascending=True)
    .show(100)
)
"""

In [21]:
df = (
    df
    .dropDuplicates(['symbol','year','datetime','source','frequency','type'])
)




In [22]:
print("After dropDuplicates: ", df.count())

After dropDuplicates:  997


## Checking correct values

In [None]:
"""
(
    df
    .groupBy('source','type','symbol','year','frequency')
    .count().alias('count')
    #.filter(F.col('count') > 1)
    .show()
)
"""

In [23]:
df.printSchema()

root
 |-- datetime: string (nullable = true)
 |-- open: double (nullable = true)
 |-- low: double (nullable = true)
 |-- high: double (nullable = true)
 |-- close: double (nullable = true)
 |-- volume: long (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day: string (nullable = true)
 |-- time: string (nullable = true)
 |-- date: string (nullable = true)
 |-- base_currency: string (nullable = true)
 |-- source: string (nullable = true)
 |-- frequency: string (nullable = true)
 |-- symbol: string (nullable = true)
 |-- audit_time: timestamp (nullable = true)
 |-- type: string (nullable = true)
 |-- load_date: date (nullable = true)


## Change Schema and uppercase column names

In [24]:
df = (
    df
    .withColumn("datetime", F.when(F.col("datetime").rlike("^\d{4}-\d{2}-\d{2}$"), F.to_timestamp(F.concat(F.col("datetime"), F.lit(" 00:00:00"))))
                            .otherwise(F.to_timestamp(F.col("datetime"))))
    .withColumn('volume', F.col('volume').cast('double'))
    .withColumn('month', F.col('month').cast('int'))
    .withColumn('day', F.col('day').cast('int'))
)




In [25]:
df = df.toDF(*[col.upper() for col in df.columns])




In [26]:
df.show(20)
df.printSchema()

+-------------------+------+------+------+------+------+----+-----+---+--------+----------+-------------+------+---------+------+--------------------+-----+----------+
|           DATETIME|  OPEN|   LOW|  HIGH| CLOSE|VOLUME|YEAR|MONTH|DAY|    TIME|      DATE|BASE_CURRENCY|SOURCE|FREQUENCY|SYMBOL|          AUDIT_TIME| TYPE| LOAD_DATE|
+-------------------+------+------+------+------+------+----+-----+---+--------+----------+-------------+------+---------+------+--------------------+-----+----------+
|2024-09-08 18:00:00|0.9019|0.9016|0.9019|0.9016|  15.0|2024|    9|  8|18:00:00|2024-09-08|          USD|   FMP|    15min|USDEUR|2024-09-23 09:06:...|FOREX|2024-09-23|
|2024-09-08 18:15:00|0.9018|0.9016|0.9018|0.9017|  15.0|2024|    9|  8|18:15:00|2024-09-08|          USD|   FMP|    15min|USDEUR|2024-09-23 09:06:...|FOREX|2024-09-23|
|2024-09-08 18:30:00|0.9016|0.9015|0.9016|0.9016|  15.0|2024|    9|  8|18:30:00|2024-09-08|          USD|   FMP|    15min|USDEUR|2024-09-23 09:06:...|FOREX|2024

## Append the batch data to SILVER table

Set the destination silver table

In [28]:
# Set the destination silver table
BUCKET_NAME = "cryptoengineer"
PREFIX_SILVER = "datalake/silver/forex"




In [29]:
path=f"s3://{BUCKET_NAME}/{PREFIX_SILVER}"
print("Path:",path)

Path: s3://cryptoengineer/datalake/silver/forex


In [32]:
(
    df
    .repartition("YEAR")
    .write
    .format("parquet")
    .mode("append")
    .partitionBy(['SYMBOL','YEAR'])
    .save(path)
)


