In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("ETL").getOrCreate()

In [2]:
# Extracción: Leer los datos desde un archivo CSV
data = spark.read.csv("Online_Retail.csv", header=True, inferSchema=True)

In [3]:
# Transformación: Limpiar y transformar los datos
df = data.withColumn("Total_Amount", F.col("Quantity") * F.col("UnitPrice")) \
        .withColumn("InvoiceDate", F.to_timestamp(F.col("InvoiceDate"), "MM/dd/yyyy HH:mm"))


In [4]:
# Filtrar registros inválidos (devoluciones o precios negativos)
df_clean = df.filter((F.col("Quantity") > 0) & (F.col("UnitPrice") > 0))

# Ventas por cliente
df_customer = df_clean.groupBy("CustomerID") \
                    .agg(F.sum("Total_Amount").alias("Total_Spent"),
                        F.sum("Quantity").alias("Total_Quantity"))

# Ventas por producto
df_product = df_clean.groupBy("StockCode", "Description") \
                    .agg(F.sum("Total_Amount").alias("Total_Sales"),
                        F.sum("Quantity").alias("Total_Quantity_Sold"))

# Ventas por país
df_country = df_clean.groupBy("Country") \
                    .agg(F.sum("Total_Amount").alias("Total_Sales_Country"),
                        F.sum("Quantity").alias("Total_Quantity_Country"))

In [5]:
# Load: Cargar resultados en formato parquet
df_customer_pandas = df_customer.toPandas()
df_customer_pandas.to_parquet("output/Sales_by_Customer.parquet", index=False)

df_product_pandas = df_product.toPandas()
df_product_pandas.to_parquet("output/Sales_by_product.parquet", index=False)

df_country_pandas = df_country.toPandas()
df_country_pandas.to_parquet("output/Sales_by_country.parquet", index=False)

In [6]:
spark.stop()