# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


###  Code Setup


In [1]:
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 2

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, FloatType, TimestampType
from pyspark.sql.functions import *

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.5 
Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 4.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 2
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 2
Idle Timeout: 2880
Session ID: 9a2e5b7d-b0c4-4661-b978-7e739bf4af21
Applying the following default arguments:
--glue_kernel_version 1.0.5
--enable-glue-datacatalog true
Waiting for session 9a2e5b7d-b0c4-4661-b978-7e739bf4af21 to get into ready status...
Session 9a2e5b7d-b0c4-4661-b978-7e739bf4af21 ha

## Customer csv -> Parquet


In [2]:
# File path of the CSV file
file_path = "s3://sidd-de-on-olist-data/raw/customers.csv"

# Read the CSV file into a DynamicFrame
customer_df = glueContext.create_dynamic_frame.from_options(
    connection_type="s3",
    connection_options={"paths": [file_path]},
    format="csv",
    format_options={"withHeader": "true"}  # If the CSV file has a header
)

output_parquet_path = 's3://sidd-de-on-olist-data/processed/customers/'
S3bucket_node3 = glueContext.write_dynamic_frame.from_options(
    frame= customer_df,
    connection_type="s3",
    format="glueparquet",
    connection_options={"path": output_parquet_path, "partitionKeys": []},
    format_options={"compression": "snappy"},
    transformation_ctx="S3bucket_node3",
    )




### Olist Order Items csv -> Parquet

In [3]:
file_path = "s3://sidd-de-on-olist-data/raw/order_items.csv"

# Read the CSV file into a DynamicFrame
order_items_df = glueContext.create_dynamic_frame.from_options(
    connection_type="s3",
    connection_options={"paths": [file_path]},
    format="csv",
    format_options={"withHeader": "true"}  # If the CSV file has a header
)

ApplyMapping_Order_Items = ApplyMapping.apply(
    frame=order_items_df,
    mappings=[
        ('order_id', 'string', 'order_id', 'string'),
        ('order_item_id', 'string', 'order_item_id', 'integer'),
        ('product_id', 'string', 'product_id', 'string'),
        ('seller_id', 'string', 'seller_id', 'string'),
        ('shipping_limit_date', 'string', 'shipping_limit_date', 'timestamp'),
        ('price', 'string', 'price', 'float'),
        ('freight_value', 'string', 'freight_value', 'float')
    ],
    transformation_ctx="ApplyMapping_Order_Items",
)

ApplyMapping_Order_Items = ApplyMapping_Order_Items.toDF()
ApplyMapping_Order_Items = ApplyMapping_Order_Items.withColumn("total_price", \
                        round(col("price")*col("order_item_id") + col("freight_value")*col("order_item_id"), 2))

output_parquet_path = 's3://sidd-de-on-olist-data/processed/order_items/'

ApplyMapping_Order_Items.write.mode("overwrite").parquet(output_parquet_path)



### Order_payments csv -> Parquet

In [4]:
file_path = "s3://sidd-de-on-olist-data/raw/order_payments.csv"

# Read the CSV file into a DynamicFrame
order_payments_df = glueContext.create_dynamic_frame.from_options(
    connection_type="s3",
    connection_options={"paths": [file_path]},
    format="csv",
    format_options={"withHeader": "true"}  # If the CSV file has a header
)

order_payments_df.printSchema()

root
|-- order_id: string
|-- payment_sequential: string
|-- payment_type: string
|-- payment_installments: string
|-- payment_value: string


In [5]:

ApplyMapping_Order_Payments = ApplyMapping.apply(
    frame=order_payments_df,
    mappings=[
        ('order_id', 'string', 'order_id', 'string'),
        ('payment_sequential', 'string', 'payment_sequential', 'integer'),
        ('payment_type', 'string', 'payment_type', 'string'),
        ('payment_installments', 'string', 'payment_installments', 'integer'),
        ('payment_value', 'string', 'payment_value', 'float')
    ],
    transformation_ctx="ApplyMapping_Order_Payments",
)

ApplyMapping_Order_Payments = ApplyMapping_Order_Payments.toDF()

output_parquet_path = 's3://sidd-de-on-olist-data/processed/order_payments/'

ApplyMapping_Order_Payments.write.mode("overwrite").parquet(output_parquet_path)




In [6]:
file_path = "s3://sidd-de-on-olist-data/raw/order_reviews.csv"

# Read the CSV file into a DynamicFrame
order_reviews_df = glueContext.create_dynamic_frame.from_options(
    connection_type="s3",
    connection_options={"paths": [file_path]},
    format="csv",
    format_options={"withHeader": "true"}  # If the CSV file has a header
)

order_reviews_df.printSchema()

root
|-- review_id: string
|-- order_id: string
|-- review_score: string
|-- review_comment_title: string
|-- review_comment_message: string
|-- review_creation_date: string
|-- review_answer_timestamp: string


In [7]:
ApplyMapping_Order_Reviews = ApplyMapping.apply(
    frame=order_reviews_df,
    mappings=[
        ('review_id', 'string', 'review_id', 'string'),
        ('order_id', 'string', 'order_id', 'string'),
        ('review_score', 'string', 'review_score', 'integer'),
        ('review_comment_title', 'string', 'review_comment_title', 'string'),
        ('review_comment_message', 'string', 'review_comment_message', 'string'),
        ('review_creation_date', 'string', 'review_creation_date', 'timestamp'),
        ('review_answer_timestamp', 'string', 'review_answer_timestamp', 'timestamp'),
    ],
    transformation_ctx="ApplyMapping_Order_Reviews",
)

ApplyMapping_Order_Reviews = ApplyMapping_Order_Reviews.toDF()

ApplyMapping_Order_Reviews = ApplyMapping_Order_Reviews.drop(col('review_comment_message'))

output_parquet_path = 's3://sidd-de-on-olist-data/processed/order_reviews/'

ApplyMapping_Order_Reviews.write.mode("overwrite").parquet(output_parquet_path)




In [9]:
file_path = "s3://sidd-de-on-olist-data/raw/orders.csv"

# Read the CSV file into a DynamicFrame
orders_df = glueContext.create_dynamic_frame.from_options(
    connection_type="s3",
    connection_options={"paths": [file_path]},
    format="csv",
    format_options={"withHeader": "true"}  # If the CSV file has a header
)

ApplyMapping_Orders = ApplyMapping.apply(
    frame=orders_df,
    mappings=[
        ('order_id', 'string', 'order_id', 'string'),
        ('customer_id', 'string', 'customer_id', 'string'),
        ('order_status', 'string', 'order_status', 'string'),
        ('order_purchase_timestamp', 'string', 'order_purchase_timestamp', 'timestamp'),
        ('order_approved_timestamp', 'string', 'order_approved_timestamp', 'timestamp'),
        ('order_delivered_carrier_date', 'string', 'order_delivered_carrier_date', 'timestamp'),
        ('order_delivered_customer_date', 'string', 'order_delivered_customer_date', 'timestamp'),
        ('order_estimated_delivery_date', 'string', 'order_estimated_delivery_date', 'timestamp')
    ],
    transformation_ctx="ApplyMapping_Orders",
)

ApplyMapping_Orders = ApplyMapping_Orders.toDF()

ApplyMapping_Orders = ApplyMapping_Orders.filter("order_status = 'delivered'")
ApplyMapping_Orders = ApplyMapping_Orders.drop(col('order_delivered_carried_date'))

output_parquet_path = 's3://sidd-de-on-olist-data/processed/orders/'

ApplyMapping_Orders.write.mode("overwrite").parquet(output_parquet_path)




In [7]:
file_path = "s3://sidd-de-on-olist-data/raw/products.csv"

# Read the CSV file into a DynamicFrame
products_df = glueContext.create_dynamic_frame.from_options(
    connection_type="s3",
    connection_options={"paths": [file_path]},
    format="csv",
    format_options={"withHeader": "true"}  # If the CSV file has a header
)

products_df.printSchema()

root
|-- product_id: string
|-- product_category: string
|-- product_name_length: string
|-- product_description_length: string
|-- product_photos_qty: string
|-- product_weight_g: string
|-- product_length_cm: string
|-- product_height_cm: string
|-- product_width_cm: string


In [8]:

ApplyMapping_products = ApplyMapping.apply(
    frame=products_df,
    mappings=[
        ('product_id', 'string', 'product_id', 'string'),
        ('product_category', 'string', 'product_category', 'string'),
        ('product_name_lenght', 'string', 'product_name_lenght', 'integer'),
        ('product_description_lenght', 'string', 'product_description_lenght', 'integer'),
        ('product_photos_qty', 'string', 'product_photos_qty', 'integer'),
        ('product_weight_g', 'string', 'product_weight_g', 'integer'),
        ('product_length_cm', 'string', 'product_length_cm', 'integer'),
        ('product_height_cm', 'string', 'product_height_cm', 'integer'),
        ('product_width_cm', 'string', 'product_width_cm', 'integer')
    ],
    transformation_ctx="ApplyMapping_products",
)

columns_to_drop = ['product_photos_qty', 'product_length_cm', 'product_height_cm', 'product_width_cm']  # List of column names to drop
ApplyMapping_products = ApplyMapping_products.drop_fields(columns_to_drop)

ApplyMapping_products = ApplyMapping_products.toDF()

products_name_translation_df = glueContext.create_dynamic_frame.from_options(
    connection_type="s3",
    connection_options={"paths": ['s3://sidd-de-on-olist-data/raw/product_category_name_translation.csv']},
    format="csv",
    format_options={"withHeader": "true"}  # If the CSV file has a header
)

products_name_translation_df = products_name_translation_df.toDF()
products_transformed_df = ApplyMapping_products.join(products_name_translation_df,\
                                           ApplyMapping_products.product_category == products_name_translation_df.product_category_name)

output_parquet_path = 's3://sidd-de-on-olist-data/processed/products/'

products_transformed_df.write.mode("overwrite").parquet(output_parquet_path)




In [21]:
file_path = "s3://sidd-de-on-olist-data/raw/sellers.csv"

# Read the CSV file into a DynamicFrame
sellers_df = glueContext.create_dynamic_frame.from_options(
    connection_type="s3",
    connection_options={"paths": [file_path]},
    format="csv",
    format_options={"withHeader": "true"}  # If the CSV file has a header
)

ApplyMapping_sellers = ApplyMapping.apply(
    frame=sellers_df,
    mappings=[
        ('seller_id', 'string', 'seller_id', 'string'),
        ('seller_zip_code_prefix', 'string', 'seller_zip_code_prefix', 'string'),
        ('seller_city', 'string', 'seller_city', 'string'),
        ('seller_state', 'string', 'seller_state', 'string')
    ],
    transformation_ctx="ApplyMapping_sellers",
)

ApplyMapping_sellers = ApplyMapping_sellers.toDF()

output_parquet_path = 's3://sidd-de-on-olist-data/processed/sellers/'

ApplyMapping_sellers.write.mode("overwrite").parquet(output_parquet_path)



