<a href="https://colab.research.google.com/github/KiaroRB/data-analysis-projects/blob/main/gold_data_ingestion_pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
      .appName("bronze_ingestion")\
      .getOrCreate()

In [2]:
!pip install -q gdown

In [3]:
import gdown

file_id = "1RTjsV_o2vn-s0hdO8GvLcTArI29bJDs7"
gdown.download(id=file_id, output="online_retail.csv", quiet=False)

#https://drive.google.com/file/d/1RTjsV_o2vn-s0hdO8GvLcTArI29bJDs7/view?usp=sharing

Downloading...
From: https://drive.google.com/uc?id=1RTjsV_o2vn-s0hdO8GvLcTArI29bJDs7
To: /content/online_retail.csv
100%|██████████| 45.6M/45.6M [00:00<00:00, 213MB/s]


'online_retail.csv'

In [4]:
df = spark.read.csv(
    "online_retail.csv",
    header=True,
    inferSchema=True,
    sep = ";"
)

In [None]:
# Bronze
df.write.mode("overwrite").parquet("bronze_online_retail_raw")

In [None]:
!ls
!ls bronze_online_retail_raw

bronze_online_retail_raw  sample_data
online_retail.csv	  silver_online_retail_clean
part-00000-7f819bcd-82c0-4731-82c3-1be7e46efbd6-c000.snappy.parquet  _SUCCESS
part-00001-7f819bcd-82c0-4731-82c3-1be7e46efbd6-c000.snappy.parquet


In [None]:
print("Total de registros:", df.count())
print("\nTotal de columnas:", len(df.columns))
print("\n REGISTRO DEL DATASET")
df.limit(5).show(truncate=False)
print("\n Verificación de datos")
df.printSchema()

Total de registros: 541909

Total de columnas: 8

 REGISTRO DEL DATASET
+---------+---------+-----------------------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description                        |Quantity|InvoiceDate   |UnitPrice|CustomerID|Country       |
+---------+---------+-----------------------------------+--------+--------------+---------+----------+--------------+
|536365   |85123A   |WHITE HANGING HEART T-LIGHT HOLDER |6       |1/12/2010 8:26|2,55     |17850     |United Kingdom|
|536365   |71053    |WHITE METAL LANTERN                |6       |1/12/2010 8:26|3,39     |17850     |United Kingdom|
|536365   |84406B   |CREAM CUPID HEARTS COAT HANGER     |8       |1/12/2010 8:26|2,75     |17850     |United Kingdom|
|536365   |84029G   |KNITTED UNION FLAG HOT WATER BOTTLE|6       |1/12/2010 8:26|3,39     |17850     |United Kingdom|
|536365   |84029E   |RED WOOLLY HOTTIE WHITE HEART.     |6       |1/12/2010 8:26|3,39     |17850     |

In [7]:
#ENTREGABLE 2 — Plata: Limpieza + columnas derivadas
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: string (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)



In [8]:
total_bronze = df.count()
print("Registros antes de limpiar:", total_bronze)

Registros antes de limpiar: 541909


In [9]:
#Regla 1: Quantity > 0
from pyspark.sql.functions import col
df_quantity_ok = df.filter(col("Quantity") > 0)

df_quantity_ok.select("Quantity").show(5)

print("Registros luego de Quantity > 0:", df_quantity_ok.count())
print(
    "Eliminados por Quantity <= 0:",
    total_bronze - df_quantity_ok.count()
)

+--------+
|Quantity|
+--------+
|       6|
|       6|
|       8|
|       6|
|       6|
+--------+
only showing top 5 rows
Registros luego de Quantity > 0: 531285
Eliminados por Quantity <= 0: 10624


In [10]:
#Correción del tipo de dato UnitPrice (string a double)
from pyspark.sql.functions import regexp_replace

df_price_cast = df_quantity_ok.withColumn(
    "UnitPrice",
    regexp_replace(col("UnitPrice"), ",", ".").cast("double")
)

df_price_cast.printSchema()

#Regla 1: UnitPrice > 0
df_price_ok = df_price_cast.filter(col("UnitPrice") > 0)

print("Registros antes (Quantity OK):", df_quantity_ok.count())
print("Registros después (UnitPrice > 0):", df_price_ok.count())
print(
    "Eliminados por UnitPrice <= 0:",
    df_quantity_ok.count() - df_price_ok.count()
)

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)

Registros antes (Quantity OK): 531285
Registros después (UnitPrice > 0): 530104
Eliminados por UnitPrice <= 0: 1181


In [11]:
#Regla 3: CustomerID IS NOT NULL
df_customer_ok = df_price_ok.filter(col("CustomerID").isNotNull())

print("Registros antes Price:", df_price_ok.count())
print("Registros después CustomerID NOT NULL:", df_customer_ok.count())
print(
    "Eliminados por CustomerID NULL:",
    df_price_ok.count() - df_customer_ok.count()
)

Registros antes Price: 530104
Registros después CustomerID NOT NULL: 397884
Eliminados por CustomerID NULL: 132220


In [12]:
#limpiza InvoiceDate
from pyspark.sql.functions import expr

df_silver = df_customer_ok.withColumn(
    "InvoiceDate",
    expr("try_to_timestamp(InvoiceDate, 'd/M/yyyy H:mm')")
)

In [13]:
#Columnas derivadas
from pyspark.sql.functions import year, month

df_silver = (
    df_silver
    .withColumn("SaleAmount", col("Quantity") * col("UnitPrice"))
    .withColumn("Year", year(col("InvoiceDate")))
    .withColumn("Month", month(col("InvoiceDate")))
)

df_silver.select(
    "Quantity", "UnitPrice", "SaleAmount", "Year", "Month"
).show(5)

+--------+---------+------------------+----+-----+
|Quantity|UnitPrice|        SaleAmount|Year|Month|
+--------+---------+------------------+----+-----+
|       6|     2.55|15.299999999999999|2010|   12|
|       6|     3.39|             20.34|2010|   12|
|       8|     2.75|              22.0|2010|   12|
|       6|     3.39|             20.34|2010|   12|
|       6|     3.39|             20.34|2010|   12|
+--------+---------+------------------+----+-----+
only showing top 5 rows


In [14]:
#Silver
df_silver.write.mode("overwrite").parquet("silver_online_retail_clean")

In [15]:
!ls

online_retail.csv  sample_data	silver_online_retail_clean


In [16]:
!ls silver_online_retail_clean

part-00000-9459e844-f865-4e74-84fb-c9d2d6b968a2-c000.snappy.parquet  _SUCCESS
part-00001-9459e844-f865-4e74-84fb-c9d2d6b968a2-c000.snappy.parquet


In [19]:
#Entregable 3 — Oro: Tablas analíticas + Dashboard Power BI
df_silver.show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+------------------+----+-----+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|        SaleAmount|Year|Month|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+------------------+----+-----+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|     17850|United Kingdom|15.299999999999999|2010|   12|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|             20.34|2010|   12|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|     17850|United Kingdom|              22.0|2010|   12|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|             20.34|2010|   12|
|   536365|   84029E|RED WOOLLY HO

In [21]:
#Ventas por año y mes
from pyspark.sql.functions import sum
gold_sales_monthly = (
    df_silver
    .groupBy("Year", "Month")
    .agg(
        sum("SaleAmount").alias("TotalSales")
    )
    .orderBy("Year", "Month")
)

gold_sales_monthly.show()

+----+-----+------------------+
|Year|Month|        TotalSales|
+----+-----+------------------+
|2010|   12| 572713.8900000163|
|2011|    1| 569445.0400000077|
|2011|    2| 447137.3500000165|
|2011|    3|  595500.760000013|
|2011|    4| 469200.3610000132|
|2011|    5| 678594.5600000018|
|2011|    6| 661213.6900000116|
|2011|    7| 600091.0110000141|
|2011|    8| 645343.9000000039|
|2011|    9| 952838.3819999964|
|2011|   10|1039318.7899999822|
|2011|   11|1161817.3799999433|
|2011|   12| 518192.7900000037|
+----+-----+------------------+



In [23]:
#Ventas por país
gold_sales_by_country =(
    df_silver
    .groupBy("Country")
    .agg(
        sum("SaleAmount").alias("TotalSales")
    )
    .orderBy("TotalSales", ascending=False)

)

gold_sales_by_country.show(10)

+--------------+------------------+
|       Country|        TotalSales|
+--------------+------------------+
|United Kingdom| 7308391.554000206|
|   Netherlands| 285446.3399999992|
|          EIRE|265545.89999999903|
|       Germany|228867.14000000025|
|        France|209024.05000000022|
|     Australia|138521.30999999976|
|         Spain| 61577.11000000017|
|   Switzerland|56443.950000000084|
|       Belgium| 41196.34000000001|
|        Sweden| 38378.32999999999|
+--------------+------------------+
only showing top 10 rows


In [24]:
#Top productos más vendidos
gold_top_products = (
    df_silver
    .groupBy("Description")
    .agg(
        sum("SaleAmount").alias("TotalSales"),
        sum("Quantity").alias("TotalQuantity")
    )
    .orderBy("TotalSales", ascending=False)
)

gold_top_products.show(10)

+--------------------+------------------+-------------+
|         Description|        TotalSales|TotalQuantity|
+--------------------+------------------+-------------+
|PAPER CRAFT , LIT...|          168469.6|        80995|
|REGENCY CAKESTAND...|142592.94999999966|        12402|
|WHITE HANGING HEA...|100448.14999999953|        36725|
|JUMBO BAG RED RET...| 85220.78000000044|        46181|
|MEDIUM CERAMIC TO...| 81416.72999999998|        77916|
|             POSTAGE| 77803.95999999999|         3120|
|       PARTY BUNTING| 68844.33000000006|        15291|
|ASSORTED COLOUR B...| 56580.34000000046|        35362|
|              Manual| 53779.93000000001|         7173|
|  RABBIT NIGHT LIGHT|51346.199999999975|        27202|
+--------------------+------------------+-------------+
only showing top 10 rows


In [25]:
#tablas requeridas GOLD
gold_sales_monthly.write.mode("overwrite").option("header", True)\
    .csv("gold_sales_monthly")

gold_sales_by_country.write.mode("overwrite").option("header", True)\
    .csv("gold_sales_by_country")

gold_top_products.write.mode("overwrite").option("header", True)\
    .csv("gold_top_products")
