# AWS Glue Studio Notebook

In [4]:
%idle_timeout 2880
%glue_version 5.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

You are already connected to a glueetl session aad93249-523d-4cf6-978f-8f06e53eaa60.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Current idle_timeout is 2880 minutes.
idle_timeout has been set to 2880 minutes.


You are already connected to a glueetl session aad93249-523d-4cf6-978f-8f06e53eaa60.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Setting Glue version to: 5.0


You are already connected to a glueetl session aad93249-523d-4cf6-978f-8f06e53eaa60.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous worker type: G.1X
Setting new worker type to: G.1X


You are already connected to a glueetl session aad93249-523d-4cf6-978f-8f06e53eaa60.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous number of workers: 5
Setting new number of workers to: 5



In [2]:
from pyspark.sql.functions import *
from datetime import datetime
from awsglue.dynamicframe import DynamicFrame




In [3]:
s3_path = "s3://real-time-events-search-amar/raw_data/to_process/"




In [5]:
source_dyf = glueContext.create_dynamic_frame.from_options(
    connection_type="s3",
    format="json",
    connection_options={"paths": [s3_path]},
    format_options={"withHeader": True},
    transformation_ctx="source_dyf"
)




In [6]:
real_time_events_df = source_dyf.toDF()



In [7]:
real_time_events_df.show(2)

+------+--------------------+--------------------+--------------------+
|status|          request_id|          parameters|                data|
+------+--------------------+--------------------+--------------------+
|    OK|fb21b62f-a4fa-43b...|{concerts in indi...|[{L2F1dGhvcml0eS9...|
+------+--------------------+--------------------+--------------------+


In [8]:
test_df = real_time_events_df




In [8]:
test_df.withColumn("data", explode("data")).show(5, False)

+------+------------------------------------+----------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [11]:
test_df.withColumn("data", explode("data")).select(
    col("data.event_id").alias("event_id"),
    col("data.name").alias("artist_name"),
    col("data.description").alias("description"),
    col("data.start_time").cast("timestamp").alias("event_start_time"),
    regexp_replace(col("data.venue.full_address"), ",", "").alias("venue_address"),
    col("data.venue.name").alias("venue_name"),
    col("data.venue.phone_number").alias("venue_phone_number"),
    col("data.link").alias("link")
).show(1, False)

+----------------------------------------------------------------------------------------+-----------------------------------------+--------------------------------------------------------------------------------------------+-------------------+----------------------------------------------------------------------------+------------------------+------------------+-------------------------------------------------------+
|event_id                                                                                |artist_name                              |description                                                                                 |event_start_time   |venue_address                                                               |venue_name              |venue_phone_number|link                                                   |
+----------------------------------------------------------------------------------------+-----------------------------------------+----------------------

In [12]:
def real_time_events_data(df):
    df = df.withColumn("data", explode("data"))
    df = df.select(
        col("data.event_id").alias("event_id"),
        col("data.name").alias("artist_name"),
        col("data.description").alias("description"),
        col("data.start_time").cast("timestamp").alias("event_start_time"),
        regexp_replace(col("data.venue.full_address"), ",", "").alias("venue_address"),
        col("data.venue.name").alias("venue_name"),
        col("data.venue.phone_number").alias("venue_phone_number"),
        col("data.link").alias("link")
    )
    return df




In [13]:
real_time_events_processed_df = real_time_events_data(test_df)




In [14]:
def write_to_s3(df, s3_path, format_type="csv"):
    dynamic_frame = DynamicFrame.fromDF(df, glueContext, "dynamic_frame")
    
    glueContext.write_dynamic_frame.from_options(
        frame=dynamic_frame,
        connection_type="s3",
        connection_options={"path": f"s3://real-time-events-search-amar/transformed_data/{s3_path}/"},
        format=format_type
    )




In [15]:
write_to_s3(real_time_events_processed_df, "transformed_data_{}".format(datetime.now().strftime("%Y-%m-%d_%H-%M-%S")), "csv")


