In [1]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import explode, col, to_date
from datetime import datetime
from awsglue.dynamicframe import DynamicFrame

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.5 
Trying to create a Glue session for the kernel.
Session Type: glueetl
Session ID: 23410505-3d63-4b94-92a9-8abd1c490df3
Applying the following default arguments:
--glue_kernel_version 1.0.5
--enable-glue-datacatalog true
Waiting for session 23410505-3d63-4b94-92a9-8abd1c490df3 to get into ready status...
Session 23410505-3d63-4b94-92a9-8abd1c490df3 has been created.



In [2]:
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)




In [3]:
s3_path = "s3://unified-uw-etl-project/raw_data/to_be_processed/"
source_dyf = glueContext.create_dynamic_frame_from_options(
    connection_type="s3",
    connection_options={"paths":[s3_path]},
    format="json"
)




In [4]:
uw_df = source_dyf.toDF()



In [5]:
def process_policy(df):
    df = df.withColumn("items", explode("items")).select(
        col("items.policy.id").alias("policy_id"),
        col("items.policy.name").alias("policy_name"),
        col("items.policy.issue_date").alias("issue_date"),
        col("items.policy.sum_assured").alias("sum_assured"),
        col("items.policy.policy_status").alias("policy_status")
        col("items.policy.uw_comment").alias("uw_comment")
        col("items.policy.error_queue").alias("error_queue")
        col("items.policy.external_urls.uw").alias("url")
    ).drop_duplicates(["policy_id"])
    return df

    # Convert string dates in 'song_added' to actual date types
    df = df.withColumn("issue_date", to_date(col("issue_date")))




In [6]:
def process_customer(df):
    
    df_customer = df.withColumn("items", explode("items")).select(
        col("customer.id").alias("customer_id"),
        col("customer.name").alias("customer_name"),
        col("customer.external_urls.uw").alias("url")
    ).drop_duplicates(["customer_id"])
    
    return df_customer




In [7]:
def process_agent(df):
    # Explode the items array to create a row for each song
    df_exploded = df.select(explode(col("items")).alias("item"))
    
    # Extract agent information from the exploded DataFrame
    df_agent = df_exploded.select(
        col("item.agent.id").alias("agent_id"),
        col("item.agent.name").alias("agent_name"),
        col("item.agent.joining_date").alias("joining_date"),
        col("item.agent.external_urls.profile").alias("url"),
        col("item.agent.branch").alias("branch"),
        col("item.agent.manager").alias("manager"),
        col("item.policy.id").alias("policy_id"),
        col("customer.id").alias("customer_id")
    ).drop_duplicates(["agent_id"])
    
    # Convert string dates in 'joining_date' to actual date types
    df_agent = df_agent.withColumn("joining_date", to_date(col("joining_date")))
    
    return df_agent




In [8]:
policy_df = process_policy(uw_df)




In [9]:
customer_df = process_customer(uw_df)




In [10]:
agent_df = process_agent(uw_df)




In [11]:
def write_to_s3(df, path_suffix, format_type="csv"):
    # Convert back to DynamicFrame
    dynamic_frame = DynamicFrame.fromDF(df, glueContext, "dynamic_frame")
    
    glueContext.write_dynamic_frame.from_options(
        frame = dynamic_frame,
        connection_type = "s3",
        connection_options = {"path": f"s3://unified-uw-etl-project/transformed_data/{path_suffix}/"},
        format = format_type
    )




In [None]:
#write data to s3   
write_to_s3(policy_df, "policy_data/policy_transformed_{}".format(datetime.now().strftime("%Y-%m-%d")), "csv")

write_to_s3(customer_df, "customer_data/customer_transformed_{}".format(datetime.now().strftime("%Y-%m-%d")), "csv")

write_to_s3(agent_df, "agent_data/agent_transformed_{}".format(datetime.now().strftime("%Y-%m-%d")), "csv")

In [1]:
import boto3

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.5 
Trying to create a Glue session for the kernel.
Session Type: glueetl
Session ID: 437a8ae3-8822-4fbc-be72-e6176db1e02d
Applying the following default arguments:
--glue_kernel_version 1.0.5
--enable-glue-datacatalog true
Waiting for session 437a8ae3-8822-4fbc-be72-e6176db1e02d to get into ready status...
Session 437a8ae3-8822-4fbc-be72-e6176db1e02d has been created.



In [3]:
def list_s3_objects(bucket, prefix):
    s3_client = boto3.client('s3')
    response = s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix)
    keys = [content['Key'] for content in response.get('Contents', []) if content['Key'].endswith('.json')]
    return keys

bucket_name = "unified-uw-etl-project"
prefix = "raw_data/to_be_processed/"
Unified_UW_keys = list_s3_objects(bucket_name, prefix)




In [None]:
def move_and_delete_files(Unified_UW_keys, Bucket):
    s3_resource = boto3.resource('s3')
    
    # Loop through each key in the spotify_keys list
    for key in Unified_UW_keys:
        copy_source = {
            'Bucket': Bucket,
            'Key': key  # key is defined here in the loop
        }

        # Define the destination key, ensuring 'key' is used within the loop
        destination_key = 'raw_data/processed/' + key.split('/')[-1]

        # Copy the file to the destination
        s3_resource.meta.client.copy(copy_source, Bucket, destination_key)

        # Delete the original file
        s3_resource.Object(Bucket, key).delete()

# Call the function to move and delete files
move_and_delete_files(Unified_UW_keys, bucket_name)

# Commit the job after the file move/delete operation
job.commit()