In [None]:
"""
Scenario:

You are working on a data integration project where you need to consolidate user, order, and product information from multiple JSON files into a single merged JSON file. Your task involves several steps, including schema inference, data transformation, and writing results to a data catalog table. Identify the difference in schema and final dataframe and generate the output. 

Requirements:

Schema Inference:

You have a CSV file named schema.csv that describes the schema of the final DataFrame. The CSV file is stored in an S3 bucket at s3://your-bucket/schema.csv. Using AWS Glue Crawler, infer the schema from the schema.csv file and create a table in the Glue Data Catalog.

Data Transformation:

You have three JSON files stored in S3: users.json (located at s3://your-bucket/users.json) orders.json (located at s3://your-bucket/orders.json) products.json (located at s3://your-bucket/products.json) These JSON files contain user, order, and product information respectively.

The goal is to:

Flatten the JSON structures. Join the data based on appropriate keys. Produce a single merged JSON file.

Write to Glue Table:

Write the resulting merged JSON data to a table in the Glue Data Catalog. The table should be named merged_data_table and should be stored in the S3 location s3://your-bucket/merged_data/.
"""

In [None]:
#imports
import boto3
import pandas as pd
import json
from awsglue.context import GlueContext
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode

In [None]:
# Initialize Spark and Glue contexts
sc = SparkContext()
glueContext = GlueContext(sc)
spark = SparkSession.builder.appName("DataIntegration").getOrCreate()

In [None]:
# Load schema CSV from S3
s3_client = boto3.client('s3')
schema_bucket = 'project_input_data_integration_bucket'
schema_key = 'schema.csv'

In [None]:
# Download schema file
schema_obj = s3_client.get_object(Bucket=schema_bucket, Key=schema_key)
schema_df = pd.read_csv(schema_obj['Body'])

In [None]:
# Infer schema for the final DataFrame
schema_inferred = schema_df.dtypes.to_dict()

In [None]:
# Read the JSON files from S3
users_json_path = 's3://your-bucket/users.json'
orders_json_path = 's3://your-bucket/orders.json'
products_json_path = 's3://your-bucket/products.json'

In [None]:
# Load users, orders, and products JSON data
users_df = spark.read.json(users_json_path)
orders_df = spark.read.json(orders_json_path)
products_df = spark.read.json(products_json_path)

In [None]:
# Flatten the nested JSON structures (for users and orders)
users_flat_df = users_df.withColumn("home_street", col("address.home.street")) \
    .withColumn("home_city", col("address.home.city")) \
    .withColumn("home_zipcode", col("address.home.zipcode")) \
    .withColumn("office_street", col("address.office.street")) \
    .withColumn("office_city", col("address.office.city")) \
    .withColumn("office_zipcode", col("address.office.zipcode")) \
    .drop("address")

In [None]:
orders_flat_df = orders_df.withColumn("item", explode(col("items"))) \
    .withColumn("product_name", col("item.product_name")) \
    .withColumn("quantity", col("item.quantity")) \
    .withColumn("price", col("item.price")) \
    .drop("items", "item")

In [None]:
# Join the data on appropriate keys (user_id, product_name)
merged_df = orders_flat_df.join(users_flat_df, orders_flat_df.customer_id == users_flat_df.user_id, "inner") \
    .join(products_df, orders_flat_df.product_name == products_df.product_name, "inner") \
    .select("order_id", "customer_id", "first_name", "last_name", "product_name", "quantity", "price", "total_amount",
            "home_street", "home_city", "home_zipcode", "office_street", "office_city", "office_zipcode")

In [None]:
# Show the final merged DataFrame
merged_df.show()

In [None]:
# Write the resulting merged JSON data to a Glue Table
output_path = 's3://your-bucket/merged_data/'
merged_df.write.json(output_path)

In [None]:
# Create a Glue Table and save the merged data
glueContext.create_dynamic_frame.from_catalog(database="final_database", table_name="merged_data_table")

In [None]:
# Clean up resources
sc.stop()