In [None]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import functions as F
from awsglue.dynamicframe import DynamicFrame

# Parse job parameters
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# Step 1: Load the data from S3 (specify the S3 path and data format)
datasource = glueContext.create_dynamic_frame.from_options(
    connection_type="s3",
    connection_options={"paths": ["s3://bucket-name/sales_data.csv"]},
    format="csv",
    format_options={"withHeader": True}
)

# Step 2: Convert the Glue DynamicFrame to a Spark DataFrame
df = datasource.toDF()

# Step 3: Remove duplicates (based on key columns)
df_cleaned = df.dropDuplicates()

# Step 4: Handle null values
# Option 1: Drop rows with any null values
df_cleaned = df_cleaned.dropna()

# Write out as a CSV file
df_cleaned.write.mode("overwrite").csv("s3://bucket-name/Cleaned_data")

# create a crawler and point the datsource to s3://crawlablebucketshell/Cleaned_data
# Load data from Glue Data Catalog (replace 'your_database' and 'your_table' with actual values)
data_source = glueContext.create_dynamic_frame.from_catalog(database="<crawler-db>", table_name="crawler-table")

# Convert Glue DynamicFrame to Spark DataFrame
df = data_source.toDF()

# Print the schema to verify the column names
df.printSchema()

# PostgreSQL JDBC connection details
jdbc_url = "jdbc:postgresql://<db-endpoint>:<port>/your_database_name";
connection_properties = {
    "user": "", #input postgres username
    "password": "", #input password
    "driver": "org.postgresql.Driver"
}

# Transformations to map CSV columns to the required schema
# Distributor Table
distributor_df = df.select("DistributorID","DistributorName").distinct()

# DC Table
dc_df = df.select("DCNumber", "DCName", "DistributorName","DistributorID").distinct()

# Sales Date Table
sales_date_df = df.select("SalesMonth", "SalesDay", "SalesYear", "DateID").distinct()

# SKU Table
sku_df = df.select("DistributorSKU", "ShellSKUNumber", "DistributorSKUDescription","SKU_ID").distinct()

# Sales Table (Ensure correct column names based on CSV schema)
sales_df = df.select(
    "DCNumber", 
    "SalesMonth", 
    "DistributorSKU", 
    "DFOAQuantity", 
    "non-dfoaquantity",   # Ensure correct case-sensitive column name
    "UnitOfMeasure",
    "DCID",
    "DateID",
    "SKU_ID"
    
    
)

# Write DataFrames to PostgreSQL tables using JDBC

# 1. Insert data into distributor_table
distributor_df.write.jdbc(url=jdbc_url, table="distributor_table", mode="append", properties=connection_properties)

# 2. Insert data into dc_table
dc_df.write.jdbc(url=jdbc_url, table="dc_table", mode="append", properties=connection_properties)

# 3. Insert data into sales_date_table
sales_date_df.write.jdbc(url=jdbc_url, table="sales_date_table", mode="append", properties=connection_properties)

# 4. Insert data into SKU table
sku_df.write.jdbc(url=jdbc_url, table="sku", mode="append", properties=connection_properties)

# 5. Insert data into sales_table
sales_df.write.jdbc(url=jdbc_url, table="sales_table", mode="append", properties=connection_properties)

# Commit the Glue job
job.commit()

# Commit the job
job.commit()


#### 0. SQL Queries to create Schema in postgres 
# CREATE TABLE distributor_table (
#     DistributorID SERIAL PRIMARY KEY,
#     DistributorName VARCHAR(255) NOT NULL
# );

# CREATE TABLE dc_table (
#     DCNumber VARCHAR(100) NOT NULL,
#     DCName VARCHAR(255) NOT NULL,
#     DCID SERIAL PRIMARY KEY,
#     DistributorID INT REFERENCES distributor_table(DistributorID)
# );

# CREATE TABLE sales_date_table (
#     SalesMonth INT NOT NULL,
#     SalesDay INT NOT NULL,
#     SalesYear INT NOT NULL,
#     DateID SERIAL PRIMARY KEY
# );

# CREATE TABLE SKU (
#     DistributorSKU VARCHAR(100) NOT NULL,
#     ShellSKUNumber INT NOT NULL,
#     DistributorSKUDescription VARCHAR(255) NOT NULL,
#     SKU_ID SERIAL PRIMARY KEY
# );

# CREATE TABLE sales_table (
#     SaleID SERIAL PRIMARY KEY,
#     DCID INT REFERENCES dc_table(DCID),
#     DateID INT REFERENCES sales_date_table(DateID),
#     SKU_ID INT REFERENCES SKU(SKU_ID),
#     DFOAQuantity INT NOT NULL,
#     NonDFOAQuantity INT NOT NULL,
#     UnitOfMeasure VARCHAR(50) NOT NULL
# );

### 1.  Below are the SQL quries to execute to create indexes and materialized views for perfomance enhancement 
#  Index creation with correct case sensitivity
# CREATE INDEX idx_dc_distributor_id ON dc_table ("DistributorID");
# CREATE INDEX idx_sales_dcid ON sales_table ("DCID");
# CREATE INDEX idx_sales_dateid ON sales_table ("DateID");
# CREATE INDEX idx_sales_skuid ON sales_table ("SKU_ID");
# CREATE INDEX idx_sales_date ON sales_date_table ("SalesMonth", "SalesDay", "SalesYear")

#  Create a materialized view for total sales by DCID and SKU_ID
# CREATE MATERIALIZED VIEW sales_summary AS
# SELECT 
#     "DCID",
#     "SKU_ID",
#     SUM("DFOAQuantity") AS total_dfoa_quantity,
#     SUM("non-dfoaquantity") AS total_non_dfoa_quantity,
#     COUNT(*) AS total_sales
# FROM 
#     sales_table
# GROUP BY 
#     "DCID", "SKU_ID";


#  Create an index on the materialized view to speed up queries
# CREATE INDEX idx_sales_summary_dcid_skuid ON sales_summary ("DCID", "SKU_ID");
#  Refresh the materialized view
# REFRESH MATERIALIZED VIEW sales_summary;

# 2. SQL Queries to address the questions raised by the team lead by Odette.
#      a)  Calculate total sales revenue per product.
#      SELECT 
#     s."SKU_ID",
#     sk."DistributorSKU",
#     SUM(s."DFOAQuantity" + s."non-dfoaquantity") * sk."Price" AS total_revenue
# FROM 
#     sales_table s
# JOIN 
#     SKU sk ON s."SKU_ID" = sk."SKU_ID"
# GROUP BY 
#     s."SKU_ID", sk."DistributorSKU"
# ORDER BY 
#     total_revenue DESC;

#      b) Determine the total quantity sold per customer.
     
#      SELECT 
#     s."DCID",
#     SUM(s."DFOAQuantity" + s."non-dfoaquantity") AS total_quantity_sold
# FROM 
#     sales_table s
# GROUP BY 
#     s."DCID"
# ORDER BY 
#     total_quantity_sold DESC;

#      c)  Identify the top 5 products by revenue.
     
#      SELECT 
#     s."SKU_ID",
#     sk."DistributorSKU",
#     SUM(s."DFOAQuantity" + s."non-dfoaquantity") * sk."Price" AS total_revenue
# FROM 
#     sales_table s
# JOIN 
#     SKU sk ON s."SKU_ID" = sk."SKU_ID"
# GROUP BY 
#     s."SKU_ID", sk."DistributorSKU"
# ORDER BY 
#     total_revenue DESC
# LIMIT 5;


#      d) Generate a monthly sales report showing total revenue and quantity sold.
     
#      SELECT 
#     DATE_TRUNC('month', DATE (s."SalesYear" || '-' || s."SalesMonth" || '-01')) AS sales_month,
#     SUM(s."DFOAQuantity" + s."non-dfoaquantity") AS total_quantity_sold,
#     SUM((s."DFOAQuantity" + s."non-dfoaquantity") * sk."Price") AS total_revenue --make sure to get Price from Odette
# FROM 
#     sales_table s
# JOIN 
#     SKU sk ON s."SKU_ID" = sk."SKU_ID"
# GROUP BY 
#     sales_month
# ORDER BY 
#     sales_month;







