### Glue Job to Fetch Data from API ingest to S3 post transformations
This code performs the following tasks:
1. Importing all necessary packages
2. Fetches Data from api
3. Ingest the paruqet file to raw-layer S3
4. Using custom transformations the dataset is dividied into multiple dataframes according to the data needs
5. All the individual dataframes are ingested back to S3 post transformations for visualization using Athena & QuickSight

In [1]:
import requests,sys
import pandas as pd
from pyspark.sql import SparkSession , DataFrame
from pyspark.sql.functions import col , sum , avg , format_number
from pyspark.sql.types import TimestampType , DoubleType 
from datetime import datetime
from typing import Union, List
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def exit_function() -> None:
    sys.exit(1)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.7 
Trying to create a Glue session for the kernel.
Session Type: glueetl
Session ID: 2b452660-8525-4e07-b28a-0b103118997e
Applying the following default arguments:
--glue_kernel_version 1.0.7
--enable-glue-datacatalog true
Waiting for session 2b452660-8525-4e07-b28a-0b103118997e to get into ready status...
Session 2b452660-8525-4e07-b28a-0b103118997e has been created.



In [2]:
def fetch_data(api_url : str) -> requests.Response:
    """
    Fetches API response using get method

    Args:
        api_url (str): url of the api which need to be accessed

    Returns:
        requests.Response : Response Generated from hitting the API.

    Raises:
        Exception: If Status Code other than 200

    """
    try:
        response = requests.get(api_url)
        response.raise_for_status()  # Raises an HTTPError for bad responses (4xx and 5xx)
        logger.info(f"Response generated with {response.status_code} statusCode")
        return response
    except requests.exceptions.RequestException as e:
        logger.error(f"Failed to generate response from API with error with error -> {e}")
        raise
        
def s3_write(s3_prefix: str, data_frame: DataFrame, s3_path: str = "s3://sales-data-kdr/") -> None:
    """
    Writes the DataFrame to S3 path based on the inputs provided
    """
    try:
        path = f'{s3_path}{s3_prefix}/year={datetime.today().strftime("%Y")}/month={datetime.today().strftime("%m")}/day={datetime.today().strftime("%d")}/'
        data_frame.write.mode("overwrite").parquet(path)
        logger.info(f"DataFrame written to S3 path: {path}")
    except Exception as e:
        logger.error(f"Failed to write data to S3 path {path} with error -> {e}")
        raise

def spark_data_transform(raw_df: DataFrame, column_names: Union[str, List[str]], dtypes: Union[str, List[str]]) -> DataFrame:
    """
    Transforms provided column_names data types to provided dtypes values

    Args:
        raw_df (DataFrame): The input DataFrame.
        column_names (Union[str, List[str]]): The column name or list of column names to transform.
        dtypes (Union[str, List[str]]): The data type or list of data types to cast to.

    Returns:
        DataFrame: The transformed DataFrame.

    Raises:
        ValueError: If invalid DataFrame or incorrect column names and data types are provided.
        Exception: If any other error occurs.
    """
    try:
        if isinstance(column_names, list) and isinstance(dtypes, list):
            if len(column_names) != len(dtypes):
                raise ValueError("The length of column_names and dtypes must be the same.")
            for column, dtype in zip(column_names, dtypes):
                raw_df = raw_df.withColumn(column, col(column).cast(dtype()))
        elif isinstance(column_names, str) and isinstance(str(dtypes), str):
            raw_df = raw_df.withColumn(column_names, col(column_names).cast(dtypes()))
        else:
            raise ValueError("column_names and dtypes must both be either strings or lists of equal length.")

        logging.info(f"Transformation successful: {column_names} cast to {dtypes}")
        return raw_df

    except Exception as e:
        logger.error(f"Failed to transform column(s) {column_names} to {dtypes} with error -> {e}")
        exit_function()




In [3]:
try:
    pandas_df = pd.DataFrame(fetch_data(api_url="https://dznc158l4m.execute-api.us-east-1.amazonaws.com/uat_stage/sales_data_api_s3_ray_layer").json())
    spark=SparkSession.builder.appName("sales_data_transformation").getOrCreate() # Spark Session Builder 
    raw_df = spark.createDataFrame(pandas_df)
    print(raw_df.show(5))
    print(raw_df.printSchema()) # Prints Column Names and their respective Dtypes
    s3_write(s3_prefix='raw-layer',data_frame=raw_df)
except Exception as e:
    logger.error(f"Failed to execute with error -> {e}")
    exit_function()

+--------------------+--------------------+--------+-------------------+--------------+----------+--------------------+------------+------------+--------+------+------+------+-----------------+--------------------+
|      transaction_id|           timestamp|store_id|         store_name|store_location|product_id|        product_name|    category|retail_price|quantity| brand|   mrp| total|    customer_name|      customer_email|
+--------------------+--------------------+--------+-------------------+--------------+----------+--------------------+------------+------------+--------+------+------+------+-----------------+--------------------+
|ba87f919-15a2-4b1...|2024-12-26T20:26:...|    S013|Chennai Citi Centre|       Chennai|      H036|boAt Airdopes 441...|   Earphones|     2124.15|       2|  boAt|  2499|  4998|      Charles Roy| wpillai@example.org|
|33586a47-e356-4f9...|2024-12-06T20:26:...|    S021|       Pacific Mall|     Ghaziabad|      P099|Apple iPhone 12 P...|Mobile Phone|   11049

In [4]:
transformed_df=spark_data_transform(raw_df=raw_df,column_names=['timestamp','total','mrp'],dtypes=[TimestampType,DoubleType,DoubleType])




In [5]:
products_data_columns=["product_name","brand","category","total","quantity","retail_price","mrp"]
brand_product_sales=transformed_df.select(*products_data_columns)
brand_product_sales.show(5)

+--------------------+------+------------+--------+--------+------------+--------+
|        product_name| brand|    category|   total|quantity|retail_price|     mrp|
+--------------------+------+------------+--------+--------+------------+--------+
|boAt Airdopes 441...|  boAt|   Earphones|  4998.0|       2|     2124.15|  2499.0|
|Apple iPhone 12 P...| Apple|Mobile Phone|129999.0|       1|   110499.15|129999.0|
|Lenovo Yoga Tab 3...|Lenovo|      Tablet| 24990.0|       1|     22491.0| 24990.0|
|LG 55 Inch OLED 4...|    LG|          TV|219980.0|       2|     98991.0|109990.0|
|     HP Spectre x360|    HP|      Laptop|269998.0|       2|   114749.15|134999.0|
+--------------------+------+------------+--------+--------+------------+--------+
only showing top 5 rows


In [6]:
brand_sales = brand_product_sales.groupBy("brand").agg(sum("total").alias("total_sales"),\
                                                       sum("quantity").alias("total_quantity"))
brand_sales= brand_sales.withColumn("sale_per_piece_avg", col("total_sales") / col("total_quantity"))\
                        .withColumn("total_sales",format_number(col("total_sales"),2))\
                        .withColumn("sale_per_piece_avg", format_number(col("sale_per_piece_avg"),2)) # To add precision stopping from converting into scientific number

s3_write(s3_prefix='transformed-layer/brand_sales',data_frame=brand_sales)
brand_sales.show(10000)
category_sales=brand_product_sales.groupBy("category").agg(sum("total").alias("total_sales"),\
                                                           sum("quantity").alias("total_quantity"))
category_sales= category_sales.withColumn("sale_per_piece_avg", col("total_sales") / col("total_quantity"))\
                              .withColumn("total_sales", format_number(col("total_sales"),2))\
                              .withColumn("sale_per_piece_avg", format_number(col("sale_per_piece_avg"),2))

s3_write(s3_prefix='transformed-layer/category_sales',data_frame=brand_sales)
category_sales.show(100)

+----------+--------------+--------------+------------------+
|     brand|   total_sales|total_quantity|sale_per_piece_avg|
+----------+--------------+--------------+------------------+
| Microsoft| 32,255,430.00|           457|         70,580.81|
|      Asus| 47,634,376.00|           511|         93,217.96|
|   Philips|    883,888.00|           112|          7,891.86|
|     Anker|  1,295,790.00|           210|          6,170.43|
|    Huawei| 13,639,460.00|           554|         24,619.96|
|     Apple|317,628,421.00|          3381|         93,945.11|
|      boAt|    907,557.00|           343|          2,645.94|
|       TCL| 60,633,140.00|           886|         68,434.70|
|    Xiaomi|  1,835,882.00|           118|         15,558.32|
|     Noise|    371,906.00|            94|          3,956.45|
|        HP| 68,864,618.00|           581|        118,527.74|
|      Sony|100,514,880.00|          1212|         82,933.07|
|    Realme|  1,030,699.00|           301|          3,424.25|
|      M

In [7]:
profit_per_brand = brand_product_sales.groupBy("brand").agg(sum("total").alias("total_sales"),\
                                                            sum("retail_price").alias("total_retail_price")).withColumn("total_profit_per_brand",col("total_sales") - col("total_retail_price"))
profit_per_brand = profit_per_brand.withColumn("total_sales", format_number(col("total_sales"), 2)) \
                                   .withColumn("total_retail_price", format_number(col("total_retail_price"), 2))\
                                   .withColumn("total_profit_per_brand",format_number(col("total_profit_per_brand"), 2)) # To add precision stopping from converting into scientific number
s3_write(s3_prefix='transformed-layer/profit_per_brand',data_frame=profit_per_brand)                                
profit_per_brand.show(500)
profit_per_category = brand_product_sales.groupBy("category").agg(sum("total").alias("total_sales"),\
                                                            sum("retail_price").alias("total_retail_price")).withColumn("total_profit_per_brand",col("total_sales") - col("total_retail_price"))
profit_per_category = profit_per_category.withColumn("total_sales", format_number(col("total_sales"), 2)) \
                                   .withColumn("total_retail_price", format_number(col("total_retail_price"), 2))\
                                   .withColumn("total_profit_per_brand",format_number(col("total_profit_per_brand"), 2)) # To add precision stopping from converting into scientific number
profit_per_category.show()
s3_write(s3_prefix='transformed-layer/profit_per_category',data_frame=profit_per_category)                                

+----------+--------------+------------------+----------------------+
|     brand|   total_sales|total_retail_price|total_profit_per_brand|
+----------+--------------+------------------+----------------------+
| Microsoft| 32,255,430.00|     14,566,239.00|         17,689,191.00|
|     Anker|  1,295,790.00|        562,609.90|            733,180.10|
|   Philips|    883,888.00|        390,950.70|            492,937.30|
|      Asus| 47,634,376.00|     20,389,590.75|         27,244,785.25|
|     Beats|  1,511,370.00|        611,745.00|            899,625.00|
|    Huawei| 13,639,460.00|      6,505,272.00|          7,134,188.00|
|     Apple|317,628,421.00|    136,148,587.55|        181,479,833.45|
|      boAt|    907,557.00|        367,401.45|            540,155.55|
|       TCL| 60,633,140.00|     25,992,693.00|         34,640,447.00|
|     Noise|    371,906.00|        169,957.50|            201,948.50|
|    Realme|  1,030,699.00|        435,071.65|            595,627.35|
|        HP| 68,864,

In [8]:
store_sales_columns = ["product_name","brand","category","total","quantity","retail_price","mrp","store_location","store_name"]
store_sales_raw_data=transformed_df.select(*store_sales_columns)

store_sales = store_sales_raw_data.groupBy("store_name","store_location").agg(sum("total").alias("total_sales"),\
                                                            sum("retail_price").alias("total_retail_price")).withColumn("total_profit_per_brand",col("total_sales") - col("total_retail_price"))
store_sales = store_sales.withColumn("total_sales", format_number(col("total_sales"), 2)) \
                                   .withColumn("total_retail_price", format_number(col("total_retail_price"), 2))\
                                   .withColumn("total_profit_per_brand",format_number(col("total_profit_per_brand"), 2))
store_sales.show(100)
s3_write(s3_prefix='transformed-layer/store_sales',data_frame=store_sales)
higest_sold_products = store_sales_raw_data.groupBy("product_name","brand","store_name","store_location").agg(sum("total").alias("total_sales"),\
                                                            sum("quantity").alias("total_quantity"))
higest_sold_products.show(5)
higest_sold_product = higest_sold_products.groupBy("product_name","brand","store_name","store_location").agg(sum("total_sales").alias("total_sales"),\
                                                            sum("total_quantity").alias("total_quantity"))
sorted_df = higest_sold_product.orderBy(col("total_sales").desc(), col("total_quantity").desc())
sorted_df.show(5)
s3_write(s3_prefix='transformed-layer/higest_sold_product',data_frame=sorted_df)


+--------------------+--------------+-------------+------------------+----------------------+
|          store_name|store_location|  total_sales|total_retail_price|total_profit_per_brand|
+--------------------+--------------+-------------+------------------+----------------------+
|         Rohini Mall|     New Delhi|41,541,327.00|     18,709,961.90|         22,831,365.10|
|   DLF Mall of India|         Noida|46,636,444.00|     19,767,218.35|         26,869,225.65|
|    City Centre Mall|       Kolkata|47,047,321.00|     20,705,795.50|         26,341,525.50|
|         R City Mall|        Mumbai|49,223,933.00|     21,811,638.05|         27,412,294.95|
|Sarath City Capit...|     Hyderabad|52,369,899.00|     22,778,823.05|         29,591,075.95|
|        Pacific Mall|     Ghaziabad|49,888,680.00|     21,766,204.50|         28,122,475.50|
|       Ambience Mall|      Gurugram|60,246,815.00|     24,965,361.65|         35,281,453.35|
|     Kochi Lulu Mall|         Kochi|47,290,389.00|     19,9