# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


In [1]:
%idle_timeout 2880
%glue_version 5.0
%worker_type G.1X
%number_of_workers 5

import sys
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.7 
Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 5.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 5
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Idle Timeout: 2880
Session ID: 049f419c-b8f3-4518-a467-72741e948ee1
Applying the following default arguments:
--glue_kernel_version 1.0.7
--enable-glue-datacatalog true
Waiting for session 049f419c-b8f3-4518-a467-72741e948ee1 to get into ready status...
Session 049f419c-b8f3-4518-a467-72741e948ee1 ha

In [2]:
#Nomas para revisar que todo bien
datasource0 = glueContext.create_dynamic_frame.from_catalog(
    database="salesdb",
    table_name="sales_raw"
)

df = datasource0.toDF()
df.show(5)

+-------+-----------+--------------+----------+------------+-----------+--------+----------+-----------+------+----------+------+--------------+
|sale_id|customer_id| customer_name|product_id|product_name|   category|quantity|unit_price|sale_amount|profit| sale_date|region|payment_method|
+-------+-----------+--------------+----------+------------+-----------+--------+----------+-----------+------+----------+------+--------------+
|S000001|      C7888| Robert Wilson|      P008|    Yoga Mat|     Sports|       2|     29.34|      58.68| 19.41|2024-03-04| North|          Cash|
|S000002|      C2451|James Anderson|      P007|  Headphones|Electronics|       2|    218.81|     437.62| 88.22|2025-05-06|  West|        PayPal|
|S000003|      C1884|  Emma Johnson|      P001|      Laptop|Electronics|       3|    762.38|    2287.14|463.22|2024-07-12| South|    Debit Card|
|S000004|      C8765| Robert Wilson|      P008|    Yoga Mat|     Sports|       3|     32.56|      97.68| 27.04|2024-06-01| North| 

In [3]:
## =============================
## SIMPLE TRANSFORMATIONS
## =============================

# Calculate new derived fields
df = (
    df.withColumn("total_revenue", F.col("quantity") * F.col("unit_price"))
      .withColumn("profit_margin", F.round(F.col("profit") / F.col("sale_amount"), 2))
)

# Clean data (remove nulls, standardize text)
df = (
    df.filter(F.col("sale_id").isNotNull())
      .withColumn("region", F.upper(F.trim(F.col("region"))))
      .withColumn("category", F.initcap(F.col("category")))
)




In [4]:
# =============================
# Top n products per regiuon
# =============================

N = 5  

window = Window.partitionBy("region").orderBy(F.col("total_quantity").desc())

top_n_df = (
    df.groupBy("region", "product_name")
      .agg(F.sum("quantity").alias("total_quantity"))
      .withColumn("rank", F.row_number().over(window))
      .filter(F.col("rank") <= N)
)

top_n_df.show()

+------+-------------+--------------+----+
|region| product_name|total_quantity|rank|
+------+-------------+--------------+----+
|  EAST|       Laptop|           539|   1|
|  EAST|   Desk Chair|           501|   2|
|  EAST|    Bookshelf|           483|   3|
|  EAST|     Yoga Mat|           471|   4|
|  EAST|   Smartphone|           460|   5|
| NORTH|   Smartphone|           553|   1|
| NORTH| Coffee Maker|           482|   2|
| NORTH|Running Shoes|           467|   3|
| NORTH|   Headphones|           454|   4|
| NORTH|   Desk Chair|           444|   5|
| SOUTH|   Desk Chair|           534|   1|
| SOUTH|       Laptop|           513|   2|
| SOUTH|   Headphones|           509|   3|
| SOUTH| Coffee Maker|           482|   4|
| SOUTH|    Bookshelf|           477|   5|
|  WEST|     Yoga Mat|           533|   1|
|  WEST|   Desk Chair|           521|   2|
|  WEST|   Smartphone|           512|   3|
|  WEST|Running Shoes|           477|   4|
|  WEST|       Laptop|           462|   5|
+------+---

In [19]:
# =============================
# Sales per month 
# =============================

df_2025 = (
    df
    .withColumn("sale_date_parsed", F.to_date("sale_date"))
    .filter(F.year("sale_date_parsed") == 2025)
    .withColumn("sale_month", F.date_format("sale_date_parsed", "yyyy-MM"))
)

monthly_df = (
    df_2025.groupBy("sale_month")
           .agg(F.sum("sale_amount").alias("monthly_sales"))
           .orderBy("sale_month")
)

monthly_df.show()

+----------+------------------+
|sale_month|     monthly_sales|
+----------+------------------+
|   2025-01|         209603.13|
|   2025-02|178264.62000000008|
|   2025-03|         188255.96|
|   2025-04|         190019.57|
|   2025-05|183264.12999999998|
|   2025-06| 214273.2499999999|
|   2025-07|191470.03999999992|
|   2025-08| 211680.1600000001|
|   2025-09|200905.66000000003|
|   2025-10| 203624.2899999999|
|   2025-11|17557.639999999996|
+----------+------------------+


In [11]:
# =============================
# KPIS
# =============================

kpi_avg_sale_amount = df.agg(F.avg("sale_amount")).first()[0]

kpi_total_sales = df.count()




In [15]:
import boto3
from decimal import Decimal

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table("kpi_summary")

table.put_item(
    Item={
        "kpi_name": "total_sales",
        "value": int(kpi_total_sales)
    }
)

table.put_item(
    Item={
        "kpi_name": "avg_sale_amount",
        "value": Decimal(kpi_avg_sale_amount)
    }
)

{'ResponseMetadata': {'RequestId': 'GITH9OLFK1KAGQH37VNFBU5NINVV4KQNSO5AEMVJF66Q9ASUAAJG', 'HTTPStatusCode': 200, 'HTTPHeaders': {'server': 'Server', 'date': 'Sun, 16 Nov 2025 00:44:00 GMT', 'content-type': 'application/x-amz-json-1.0', 'content-length': '2', 'connection': 'keep-alive', 'x-amzn-requestid': 'GITH9OLFK1KAGQH37VNFBU5NINVV4KQNSO5AEMVJF66Q9ASUAAJG', 'x-amz-crc32': '2745614147'}, 'RetryAttempts': 0}}


In [17]:
top_n_dyf = DynamicFrame.fromDF(top_n_df, glueContext, "top_n_dyf")

glueContext.write_dynamic_frame.from_options(
    frame=top_n_dyf,
    connection_type="dynamodb",
    connection_options={
        "dynamodb.output.tableName": "top_products_by_region",
        "dynamodb.throughput.write.percent": "1.0"
    }
)

<awsglue.dynamicframe.DynamicFrame object at 0x7f5a8c9da750>


In [20]:
monthly_sales_dyf = DynamicFrame.fromDF(monthly_df, glueContext, "monthly_sales_dyf")

glueContext.write_dynamic_frame.from_options(
    frame=monthly_sales_dyf,
    connection_type="dynamodb",
    connection_options={
        "dynamodb.output.tableName": "monthly_sales",
        "dynamodb.throughput.write.percent": "1.0"
    }
)

<awsglue.dynamicframe.DynamicFrame object at 0x7f5a8ca3d990>
