# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [None]:
%help

####  Run this cell to set up and start your interactive session.


In [1]:
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import col, explode_outer, concat, lit
from pyspark.sql.types import StructType, ArrayType

  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.5 
Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 4.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 5
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Idle Timeout: 2880
Session ID: dd335e2a-98d5-421b-af7c-12752a88a6d2
Applying the following default arguments:
--glue_kernel_version 1.0.5
--enable-glue-datacatalog true
Waiting for session dd335e2a-98d5-421b-af7c-12752a88a6d2 to get into ready status...
Session dd335e2a-98d5-421b-af7c-12752a88a6d2 ha

In [6]:
#### Input and output file paths

In [21]:
customers_path = "s3://aws-glue-vsb-us-east-2/shopping-details/input/users.json"
orders_path =  "s3://aws-glue-vsb-us-east-2/shopping-details/input/orders.json"
products_path =  "s3://aws-glue-vsb-us-east-2/shopping-details/input/products.json"

output_path =  "s3://aws-glue-vsb-us-east-2/shopping-details/merged_data/"




In [7]:
### Function to flatten json file

In [13]:
def flatten_df(df):
    complex_fields = dict([(field.name, field.dataType)
                           for field in df.schema.fields
                           if isinstance(field.dataType, ArrayType) or isinstance(field.dataType, StructType)])
    while len(complex_fields) != 0:
        col_name = list(complex_fields.keys())[0]
        if isinstance(complex_fields[col_name], StructType):
            expanded = [col(col_name + '.' + k).alias(col_name + '_' + k) for k in [n.name for n in complex_fields[col_name]]]
            df = df.select("*", *expanded).drop(col_name)
        elif isinstance(complex_fields[col_name], ArrayType):
            df = df.withColumn(col_name, explode_outer(col_name))
        complex_fields = dict([(field.name, field.dataType)
                               for field in df.schema.fields
                               if isinstance(field.dataType, ArrayType) or isinstance(field.dataType, StructType)])
    return df




#### Creating DynamicFrames for the files in the input folder


In [14]:
source_dyf = glueContext.create_dynamic_frame.from_catalog(database='shopping_db', table_name='order_details_schema')
source_dyf.printSchema()

df_customers = glueContext.create_dynamic_frame.from_catalog(database='shopping_db', table_name='customers_users_json').toDF()
df_orders = glueContext.create_dynamic_frame.from_catalog(database='shopping_db', table_name='orders_json').toDF()
df_products = glueContext.create_dynamic_frame.from_catalog(database='shopping_db', table_name='products_json').toDF()

root
|-- user_id: long
|-- full_name: string
|-- email: string
|-- order_id: long
|-- order_date: string
|-- item_id: string
|-- item_product_name: string
|-- quantity: long
|-- price: double
|-- total_amount: double
|-- product_product_name: string
|-- category: string
|-- stock_quantity: long


#### Flatten the dataframes and rename/transform the columns as per the requirement


In [15]:
df_flatten_customers = flatten_df(df_customers)
df_flatten_orders = flatten_df(df_orders)
df_flatten_products = flatten_df(df_products)

df_flatten_customers = df_flatten_customers\
                                .withColumn("full_name", concat(df_flatten_customers["users_name_first_name"], lit(" "), df_flatten_customers["users_name_last_name"])) \
                                .withColumnRenamed("users_user_id", "user_id") \
                                .withColumnRenamed("users_contact_email", "email")

df_flatten_orders = df_flatten_orders\
                                .withColumnRenamed("orders_order_id", "order_id") \
                                .withColumnRenamed("orders_order_date", "order_date") \
                                .withColumnRenamed("orders_items_item_id", "item_id") \
                                .withColumnRenamed("orders_items_product_name", "item_product_name") \
                                .withColumnRenamed("orders_items_quantity", "quantity") \
                                .withColumnRenamed("orders_items_price", "price") \
                                .withColumnRenamed("orders_total_amount", "total_amount")

df_flatten_products = df_flatten_products\
                                .withColumnRenamed("products_category", "category") \
                                .withColumnRenamed("products_stock_quantity", "stock_quantity")

df_flatten_customers.printSchema()
df_flatten_orders.printSchema()
df_flatten_products.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- users_name_first_name: string (nullable = true)
 |-- users_name_last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- users_contact_phone: string (nullable = true)
 |-- users_address_home_street: string (nullable = true)
 |-- users_address_home_city: string (nullable = true)
 |-- users_address_home_zipcode: string (nullable = true)
 |-- users_address_office_street: string (nullable = true)
 |-- users_address_office_city: string (nullable = true)
 |-- users_address_office_zipcode: string (nullable = true)
 |-- full_name: string (nullable = true)

root
 |-- order_id: string (nullable = true)
 |-- orders_customer_id: string (nullable = true)
 |-- order_date: string (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- item_id: string (nullable = true)
 |-- item_product_name: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: double (nullable = true)

root
 |-- products_product_

#### Merge the dataframes and select the required columns


In [17]:
merge_df = df_flatten_customers.join(df_flatten_orders, df_flatten_customers.user_id == df_flatten_orders.orders_customer_id, "inner") \
                               .join(df_flatten_products, df_flatten_orders.item_product_name == df_flatten_products.products_product_name, "inner") \
                               .select(df_flatten_customers['user_id'], df_flatten_customers['full_name'], df_flatten_customers['email'],
                                       df_flatten_orders['order_id'], df_flatten_orders['order_date'], df_flatten_orders['item_id'],
                                      df_flatten_orders['item_product_name'], df_flatten_orders['quantity'], df_flatten_orders['price'],df_flatten_orders['total_amount'], 
                                      df_flatten_products['products_product_name'], df_flatten_products['category'], df_flatten_products['stock_quantity']) \
                                .withColumnRenamed('products_product_name', 'product_product_name') \
                                .orderBy(df_flatten_customers['user_id'])

merge_df.show(30)

root
 |-- user_id: string (nullable = true)
 |-- full_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_date: string (nullable = true)
 |-- item_id: string (nullable = true)
 |-- item_product_name: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- product_product_name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- stock_quantity: integer (nullable = true)

+-------+-----------------+--------------------+--------+----------+-------+-------------------+--------+------+------------+--------------------+-----------+--------------+
|user_id|        full_name|               email|order_id|order_date|item_id|  item_product_name|quantity| price|total_amount|product_product_name|   category|stock_quantity|
+-------+-----------------+--------------------+--------+----------+-------+-------------------+--

In [None]:
### Applying the source schema to the merge dataframe

In [23]:
# Convert DynamicFrame to DataFrame to get the schema
source_df = source_dyf.toDF()
source_schema = source_df.schema

# Function to apply the source schema to the joined DataFrame
def apply_schema(df, schema):
    for field in schema.fields:
        if field.name in df.columns:
            df = df.withColumn(field.name, col(field.name).cast(field.dataType))
    return df.select(*[field.name for field in schema.fields])

merge_df.printSchema()
source_dyf.printSchema()

result_df = apply_schema(merge_df, source_schema)

result_df.printSchema()
result_df.show(30)
merge_dyf = DynamicFrame.fromDF(result_df, glueContext, "merge_dyf")
merge_dyf.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- full_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_date: string (nullable = true)
 |-- item_id: string (nullable = true)
 |-- item_product_name: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- product_product_name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- stock_quantity: integer (nullable = true)

root
|-- user_id: long
|-- full_name: string
|-- email: string
|-- order_id: long
|-- order_date: string
|-- item_id: string
|-- item_product_name: string
|-- quantity: long
|-- price: double
|-- total_amount: double
|-- product_product_name: string
|-- category: string
|-- stock_quantity: long

root
 |-- user_id: long (nullable = true)
 |-- full_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- order_id: long (nullable

#### Write the data in the DynamicFrame to a location in Amazon S3 and a table for it in the AWS Glue Data Catalog


In [25]:
s3output = glueContext.getSink(
  path = output_path,
  connection_type="s3",
  updateBehavior="UPDATE_IN_DATABASE",
  partitionKeys=[],
  compression="snappy",
  enableUpdateCatalog=True,
  transformation_ctx="s3output",
)
s3output.setCatalogInfo(
  catalogDatabase="shopping_db", catalogTableName="merged_data_table "
)
s3output.setFormat("glueparquet")
s3output.writeFrame(merge_dyf)

<awsglue.dynamicframe.DynamicFrame object at 0x7f8105275120>
