# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [None]:
%help

####  Run this cell to set up and start your interactive session.


In [1]:
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.7 
Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 4.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 5
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Idle Timeout: 2880
Session ID: 3a954dea-b6f8-4b1b-bc24-3c77fb418ac8
Applying the following default arguments:
--glue_kernel_version 1.0.7
--enable-glue-datacatalog true
Waiting for session 3a954dea-b6f8-4b1b-bc24-3c77fb418ac8 to get into ready status...
Session 3a954dea-b6f8-4b1b-bc24-3c77fb418ac8 ha

#### Example: Create a DynamicFrame from a table in the AWS Glue Data Catalog and display its schema


In [4]:
import boto3
# Initialize boto3 Glue client to list tables
client = boto3.client('glue')

# Step 1: Specify your database name
database_name = 'exchangerawdb'

# Step 2: List all tables in the specified database
response = client.get_tables(DatabaseName=database_name)

# Step 3: Filter tables by pattern (e.g., starts with 'results')
matching_tables = [table['Name'] for table in response['TableList'] if table['Name'].startswith("results")]

# Check if we have matching tables
if matching_tables:
    # Assuming you want to use the first matching table
    table_name = matching_tables[0]
    print(f"Selected table: {table_name}")
else:
    print("No matching table found.")

# Step 4: Create a dynamic frame using the selected table
dyf = glueContext.create_dynamic_frame.from_catalog(
    database=database_name,
    table_name=table_name
)

# Step 5: Show the data (for inspection)
dyf.printSchema()

Selected table: results_cb9cef5a93649a0062b4776bd8e0ca6e
root
|-- statusCode: int
|-- body: string


#### Example: Convert the DynamicFrame to a Spark DataFrame and display a sample of the data


In [3]:
df = dyf.toDF()
df.show()

+----------+--------------------+
|statusCode|                body|
+----------+--------------------+
|       200|{"conversion_rate...|
+----------+--------------------+



In [12]:
from pyspark.sql.functions import from_json, col,explode
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, MapType

# Define the schema for the JSON structure inside the "body" column
schema = StructType([
    StructField("conversion_rates", MapType(StringType(), DoubleType())),
    StructField("time_last_update_utc", StringType()),
    StructField("time_next_update_utc", StringType()),
    StructField("timestamp", StringType())
])

# Parse the JSON string in the "body" column
df_parsed = df.withColumn("parsed_body", from_json(col("body"), schema))

# Now you can select the parsed columns and display them
# df_parsed.select("parsed_body").show(truncate=False)

# Select and extract specific fields
df_flattened = df_parsed.select(
    col("parsed_body.conversion_rates").alias("conversion_rates"),
    col("parsed_body.time_last_update_utc").alias("time_last_update_utc"),
    col("parsed_body.time_next_update_utc").alias("time_next_update_utc"),
    col("parsed_body.timestamp").alias("timestamp")
)

# Now let's look at the data
# df_flattened.show(truncate=False)
# Explode the conversion_rates map into separate rows
df_exploded = df_flattened.select(
    col("time_last_update_utc"),
    col("time_next_update_utc"),
    col("timestamp"),
    explode(col("conversion_rates")).alias("currency", "rate")
)

# Now let's view the exploded data
df_exploded.show(truncate=False)

+-------------------------------+-------------------------------+--------------------------+--------+---------+
|time_last_update_utc           |time_next_update_utc           |timestamp                 |currency|rate     |
+-------------------------------+-------------------------------+--------------------------+--------+---------+
|Sat, 30 Nov 2024 00:00:02 +0000|Sun, 01 Dec 2024 00:00:02 +0000|2024-11-30 13:44:09.305899|USD     |1.0      |
|Sat, 30 Nov 2024 00:00:02 +0000|Sun, 01 Dec 2024 00:00:02 +0000|2024-11-30 13:44:09.305899|AED     |3.6725   |
|Sat, 30 Nov 2024 00:00:02 +0000|Sun, 01 Dec 2024 00:00:02 +0000|2024-11-30 13:44:09.305899|AFN     |67.9608  |
|Sat, 30 Nov 2024 00:00:02 +0000|Sun, 01 Dec 2024 00:00:02 +0000|2024-11-30 13:44:09.305899|ALL     |93.2927  |
|Sat, 30 Nov 2024 00:00:02 +0000|Sun, 01 Dec 2024 00:00:02 +0000|2024-11-30 13:44:09.305899|AMD     |394.4143 |
|Sat, 30 Nov 2024 00:00:02 +0000|Sun, 01 Dec 2024 00:00:02 +0000|2024-11-30 13:44:09.305899|ANG     |1.7

In [14]:
from pyspark.sql.functions import row_number, col
from pyspark.sql.window import Window

# Define a window specification to order by the 'currency' column
windowSpec = Window.orderBy("currency")

# Add a sequential 'ID' column
df_exploded_with_id = df_exploded.withColumn("ID", row_number().over(windowSpec)).select("ID","currency","rate","timestamp","time_last_update_utc","time_next_update_utc")

# Show the result
df_exploded_with_id.show(truncate=False)


+---+--------+---------+--------------------------+-------------------------------+-------------------------------+
|ID |currency|rate     |timestamp                 |time_last_update_utc           |time_next_update_utc           |
+---+--------+---------+--------------------------+-------------------------------+-------------------------------+
|1  |AED     |3.6725   |2024-11-30 13:44:09.305899|Sat, 30 Nov 2024 00:00:02 +0000|Sun, 01 Dec 2024 00:00:02 +0000|
|2  |AFN     |67.9608  |2024-11-30 13:44:09.305899|Sat, 30 Nov 2024 00:00:02 +0000|Sun, 01 Dec 2024 00:00:02 +0000|
|3  |ALL     |93.2927  |2024-11-30 13:44:09.305899|Sat, 30 Nov 2024 00:00:02 +0000|Sun, 01 Dec 2024 00:00:02 +0000|
|4  |AMD     |394.4143 |2024-11-30 13:44:09.305899|Sat, 30 Nov 2024 00:00:02 +0000|Sun, 01 Dec 2024 00:00:02 +0000|
|5  |ANG     |1.79     |2024-11-30 13:44:09.305899|Sat, 30 Nov 2024 00:00:02 +0000|Sun, 01 Dec 2024 00:00:02 +0000|
|6  |AOA     |918.8884 |2024-11-30 13:44:09.305899|Sat, 30 Nov 2024 00:0

In [24]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
from pyspark.sql.functions import to_timestamp, date_format,substring

# Assuming the column with the date string is named 'time_last_update_utc'
df_converted = df_exploded_with_id.withColumn(
    "last_update_date", 
    date_format(to_timestamp(col("time_last_update_utc"), "EEE, dd MMM yyyy HH:mm:ss Z"), "dd-MM-yyyy")
).withColumn(
    "next_update_date", 
    date_format(to_timestamp(col("time_next_update_utc"), "EEE, dd MMM yyyy HH:mm:ss Z"), "dd-MM-yyyy")
).withColumn(
    "date", 
    date_format(to_timestamp(substring(col("timestamp"),0,11)), "dd-MM-yyyy")
).select("ID","currency","rate","date","last_update_date","next_update_date")


# Show the result
df_converted.show()


+---+--------+---------+----------+----------------+----------------+
| ID|currency|     rate|      date|last_update_date|next_update_date|
+---+--------+---------+----------+----------------+----------------+
|  1|     AED|   3.6725|30-11-2024|      30-11-2024|      01-12-2024|
|  2|     AFN|  67.9608|30-11-2024|      30-11-2024|      01-12-2024|
|  3|     ALL|  93.2927|30-11-2024|      30-11-2024|      01-12-2024|
|  4|     AMD| 394.4143|30-11-2024|      30-11-2024|      01-12-2024|
|  5|     ANG|     1.79|30-11-2024|      30-11-2024|      01-12-2024|
|  6|     AOA| 918.8884|30-11-2024|      30-11-2024|      01-12-2024|
|  7|     ARS|  1011.75|30-11-2024|      30-11-2024|      01-12-2024|
|  8|     AUD|   1.5363|30-11-2024|      30-11-2024|      01-12-2024|
|  9|     AWG|     1.79|30-11-2024|      30-11-2024|      01-12-2024|
| 10|     AZN|   1.7003|30-11-2024|      30-11-2024|      01-12-2024|
| 11|     BAM|   1.8508|30-11-2024|      30-11-2024|      01-12-2024|
| 12|     BBD|      

In [None]:
df_converted.write.format("parquet").mode("overwrite").save("s3://exchangerateapi/intermediateTransformedData/")

#### Example: Write the data in the DynamicFrame to a location in Amazon S3 and a table for it in the AWS Glue Data Catalog


In [None]:
# s3output = glueContext.getSink(
#   path="s3://bucket_name/folder_name",
#   connection_type="s3",
#   updateBehavior="UPDATE_IN_DATABASE",
#   partitionKeys=[],
#   compression="snappy",
#   enableUpdateCatalog=True,
#   transformation_ctx="s3output",
# )
# s3output.setCatalogInfo(
#   catalogDatabase="demo", catalogTableName="populations"
# )
# s3output.setFormat("glueparquet")
# s3output.writeFrame(DyF)