# UT1 — Integración, procesamiento y análisis con Python + PySpark

**Módulo:** Sistemas de Big Data  
**Unidad:** UT1 - Aplicación de técnicas de integración, procesamiento y análisis de la información  
**Fecha:** 2025-10-06


## 0) Comprobación de entorno

Ejecuta esta celda. Si no hay PySpark instalado, verás un aviso con instrucciones. Si existe, se inicializa una `SparkSession` local.


In [1]:
# Ejecuta esta celda primero
import sys, os, time

try:
    import pyspark  # type: ignore
    from pyspark.sql import SparkSession, functions as F, types as T
    spark = (SparkSession.builder
             .appName("UT1_ETL_MVP")
             .master("local[*]")
             .config("spark.sql.shuffle.partitions","8")
             .getOrCreate())
    print("Spark version:", spark.version)
    print("Master:", spark.sparkContext.master)
except Exception as e:
    print("Error inicializando Spark:", e)

Spark version: 4.0.1
Master: local[*]


## 1) Generación de dataset sintético (e-commerce)

Crearemos un CSV con **pedidos** y **líneas** para simular un caso real sin datos personales (RGPD OK).  
Volumen objetivo por defecto: ~**200.000** filas en `order_items.csv` (ajustable).


In [2]:
# Parámetros de generación (puedes ajustar el volumen si tu equipo es modesto)
import os, csv, random, math, datetime, itertools
from pathlib import Path

random.seed(42)
out_dir = Path("data/raw")
out_dir.mkdir(parents=True, exist_ok=True)

N_ORDERS = 25000     # ~25k pedidos
MAX_ITEMS = 12       # hasta 12 líneas por pedido

products = [
    ("P-100", "Teclado mecánico", 59.90),
    ("P-101", "Ratón gaming", 39.90),
    ("P-102", "Monitor 27\"", 199.00),
    ("P-103", "Auriculares", 79.00),
    ("P-104", "Webcam HD", 49.90),
    ("P-105", "SSD 1TB", 89.00),
    ("P-106", "Silla ergonómica", 149.00),
    ("P-107", "Alfombrilla XL", 19.90),
]

countries = ["ES","FR","DE","IT","PT"]
states = ["NEW","PAID","SHIPPED","DELIVERED","RETURNED","CANCELLED"]

start_date = datetime.date(2025, 1, 1)
def rand_date():
    delta = random.randint(0, 250)
    return start_date + datetime.timedelta(days=delta)

path_orders = out_dir / "orders.csv"
path_items  = out_dir / "order_items.csv"

with path_orders.open("w", newline="", encoding="utf-8") as f1,      path_items.open("w", newline="", encoding="utf-8") as f2:
    w1 = csv.writer(f1)
    w2 = csv.writer(f2)
    w1.writerow(["order_id","order_date","country","state","customer_age"])
    w2.writerow(["order_id","item_id","product_id","product_name","unit_price","qty"])

    oid = 100000
    for _ in range(N_ORDERS):
        d = rand_date()
        country = random.choice(countries)
        state = random.choices(states, weights=[0.15,0.25,0.25,0.2,0.05,0.10])[0]
        age = max(16, int(random.gauss(35, 10)))
        w1.writerow([oid, d.isoformat(), country, state, age])

        k = random.randint(1, MAX_ITEMS)
        for i in range(1, k+1):
            prod_id, prod_name, price = random.choice(products)
            qty = max(1, int(random.gauss(2, 1)))
            w2.writerow([oid, i, prod_id, prod_name, price, qty])

        oid += 1

print("CSV generados en:", out_dir.resolve())

CSV generados en: C:\datos\Documents\Big Data e IA\Sistemas de Big Data\Ejercicio 2\data\raw


## 2) ETL con **PySpark**: ingestión → limpieza/validación → enriquecimiento → **Parquet particionado**

**Objetivo:** crear un *silver dataset* en `data/processed/` particionado por `order_year=YYYY`.

**Requisitos mínimos (MVP):**
- Esquema explícito para `orders` y `order_items`.
- Validaciones básicas (valores nulos, rangos, duplicados).
- Enriquecimiento: `order_total` por pedido y `order_month`.
- Escritura en **Parquet** particionado por `order_year`.


In [3]:
from pyspark.sql import functions as F, types as T

# 2.1 Definir esquemas
orders_schema = T.StructType([
    T.StructField("order_id", T.IntegerType(), False),
    T.StructField("order_date", T.StringType(), False),
    T.StructField("country", T.StringType(), False),
    T.StructField("state", T.StringType(), False),
    T.StructField("customer_age", T.IntegerType(), True),
])

items_schema = T.StructType([
    T.StructField("order_id", T.IntegerType(), False),
    T.StructField("item_id", T.IntegerType(), False),
    T.StructField("product_id", T.StringType(), False),
    T.StructField("product_name", T.StringType(), True),
    T.StructField("unit_price", T.DoubleType(), True),
    T.StructField("qty", T.IntegerType(), True),
])

raw_dir = "data/raw"
df_orders = (spark.read
    .option("header", True)
    .schema(orders_schema)
    .csv(f"{raw_dir}/orders.csv"))

df_items = (spark.read
    .option("header", True)
    .schema(items_schema)
    .csv(f"{raw_dir}/order_items.csv"))

print("Orders:", df_orders.count(), "Items:", df_items.count())

# 2.2 Limpieza / validaciones
# - Quitar pedidos sin líneas (join anti) y líneas con precio o cantidad inválida
df_items = df_items.where((F.col("unit_price") > 0) & (F.col("qty") > 0))

# - Parsear fechas y derivar columnas
df_orders = (df_orders
    .withColumn("order_ts", F.to_timestamp("order_date", "yyyy-MM-dd"))
    .withColumn("order_year", F.year("order_ts"))
    .withColumn("order_month", F.date_format("order_ts", "yyyy-MM"))
)

# - Duplicados (métrica + deduplicación por order_id,item_id)
dup_count = df_items.count() - df_items.dropDuplicates(["order_id","item_id"]).count()
print("Duplicados en items:", dup_count)
df_items = df_items.dropDuplicates(["order_id","item_id"])

# 2.3 Enriquecimiento
df_fact = (df_items
    .join(df_orders, "order_id", "inner")
    .withColumn("line_total", F.col("unit_price") * F.col("qty"))
)

df_order_totals = (df_fact
    .groupBy("order_id","order_year","order_month","country","state")
    .agg(F.sum("line_total").alias("order_total"))
)

# 2.4 Escritura en Parquet particionado
out_dir = "data/processed/orders_parquet"
(df_order_totals
    .repartition("order_year")
    .write
    .mode("overwrite")
    .partitionBy("order_year")
    .parquet(out_dir)
)
print("Parquet escrito en:", out_dir)

# 2.5 Métricas de calidad mínimas (ejemplo)
metrics = {
    "orders": df_orders.count(),
    "items": df_items.count(),
    "orders_with_total": df_order_totals.count(),
    "nulls_in_state": df_orders.where(F.col("state").isNull()).count(),
}

print("Métricas:", metrics)

Orders: 25000 Items: 163363
Duplicados en items: 0


Py4JJavaError: An error occurred while calling o108.parquet.
: java.lang.RuntimeException: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://cwiki.apache.org/confluence/display/HADOOP2/WindowsProblems
	at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:789)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:298)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:314)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:1116)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:798)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:838)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:810)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:837)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:810)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:837)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:810)
	at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:988)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:356)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:190)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:268)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:306)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:189)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:195)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:117)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:115)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:129)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:402)
	at org.apache.spark.sql.execution.adaptive.ResultQueryStageExec.$anonfun$doMaterialize$1(QueryStageExec.scala:325)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$4(SQLExecution.scala:322)
	at org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:272)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$3(SQLExecution.scala:320)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$2(SQLExecution.scala:316)
	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at org.apache.spark.util.Utils$.getTryWithCallerStacktrace(Utils.scala:1439)
	at org.apache.spark.util.LazyTry.get(LazyTry.scala:58)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:131)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:192)
	at org.apache.spark.sql.classic.DataFrameWriter.runCommand(DataFrameWriter.scala:622)
	at org.apache.spark.sql.classic.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
	at org.apache.spark.sql.classic.DataFrameWriter.saveInternal(DataFrameWriter.scala:241)
	at org.apache.spark.sql.classic.DataFrameWriter.save(DataFrameWriter.scala:118)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:369)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
	at java.base/java.lang.Thread.run(Thread.java:842)
	Suppressed: org.apache.spark.util.Utils$OriginalTryStackTraceException: Full stacktrace of original doTryWithCallerStacktrace caller
		at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:789)
		at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:298)
		at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:314)
		at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:1116)
		at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:798)
		at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:838)
		at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:810)
		at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:837)
		at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:810)
		at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:837)
		at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:810)
		at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:988)
		at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:356)
		at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:190)
		at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:268)
		at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:306)
		at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:189)
		at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:195)
		at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:117)
		at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:115)
		at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:129)
		at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:402)
		at org.apache.spark.sql.execution.adaptive.ResultQueryStageExec.$anonfun$doMaterialize$1(QueryStageExec.scala:325)
		at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$4(SQLExecution.scala:322)
		at org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:272)
		at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$3(SQLExecution.scala:320)
		at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
		at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$2(SQLExecution.scala:316)
		at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
		at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
		at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
		... 1 more
Caused by: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://cwiki.apache.org/confluence/display/HADOOP2/WindowsProblems
	at org.apache.hadoop.util.Shell.fileNotFoundException(Shell.java:601)
	at org.apache.hadoop.util.Shell.getHadoopHomeDir(Shell.java:622)
	at org.apache.hadoop.util.Shell.getQualifiedBin(Shell.java:645)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:742)
	at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:80)
	at org.apache.hadoop.conf.Configuration.getTimeDurationHelper(Configuration.java:1954)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1912)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1885)
	at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
	at org.apache.hadoop.util.ShutdownHookManager$HookEntry.<init>(ShutdownHookManager.java:207)
	at org.apache.hadoop.util.ShutdownHookManager.addShutdownHook(ShutdownHookManager.java:304)
	at org.apache.spark.util.SparkShutdownHookManager.$anonfun$install$1(ShutdownHookManager.scala:194)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at scala.Option.fold(Option.scala:263)
	at org.apache.spark.util.SparkShutdownHookManager.install(ShutdownHookManager.scala:195)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks$lzycompute(ShutdownHookManager.scala:55)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks(ShutdownHookManager.scala:53)
	at org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:159)
	at org.apache.spark.util.ShutdownHookManager$.<clinit>(ShutdownHookManager.scala:63)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:250)
	at org.apache.spark.util.SparkFileUtils.createTempDir(SparkFileUtils.scala:103)
	at org.apache.spark.util.SparkFileUtils.createTempDir$(SparkFileUtils.scala:102)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:99)
	at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:379)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:961)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:204)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:227)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:96)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1132)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1141)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.
	at org.apache.hadoop.util.Shell.checkHadoopHomeInner(Shell.java:521)
	at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:492)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:569)
	... 27 more


## 3) Spark SQL — consultas de negocio
**Ejemplos**
- **Ingresos por mes y país**
- **Top 5 países por ingreso total**
- **Distribución de estados del pedido**

**Actividad**
- **Ticket medio y mediana por mes y país**
- **Tasas de devolución y cancelación por mes y país**
- **Crecimiento mes a mes (MoM) de ingresos por país**

In [5]:
# Crea vista temporal y ejecuta SQL
df_order_totals.createOrReplaceTempView("orders_totals")

q1 = spark.sql("""SELECT order_month, country, ROUND(SUM(order_total), 2) AS revenue
FROM orders_totals
GROUP BY order_month, country
ORDER BY order_month, country
""")
q1.show(20, truncate=False)

q2 = spark.sql("""SELECT country, ROUND(SUM(order_total), 2) AS total_revenue
FROM orders_totals
GROUP BY country
ORDER BY total_revenue DESC
LIMIT 5
""")
q2.show(truncate=False)

from pyspark.sql import functions as F
q3 = (spark.table("orders_totals")
        .groupBy("state")
        .agg(F.countDistinct("order_id").alias("orders"))
        .orderBy(F.desc("orders")))
q3.show(truncate=False)

#TODO: Consultas 4, 5, 6. q4, q5, q6


+-----------+-------+--------+
|order_month|country|revenue |
+-----------+-------+--------+
|2025-01    |DE     |608494.1|
|2025-01    |ES     |566387.1|
|2025-01    |FR     |574235.6|
|2025-01    |IT     |599407.3|
|2025-01    |PT     |543649.5|
|2025-02    |DE     |504489.6|
|2025-02    |ES     |561997.8|
|2025-02    |FR     |516163.8|
|2025-02    |IT     |487299.9|
|2025-02    |PT     |531820.6|
|2025-03    |DE     |608561.2|
|2025-03    |ES     |605323.3|
|2025-03    |FR     |580193.2|
|2025-03    |IT     |564331.8|
|2025-03    |PT     |612236.1|
|2025-04    |DE     |517544.0|
|2025-04    |ES     |585974.6|
|2025-04    |FR     |535885.1|
|2025-04    |IT     |552092.1|
|2025-04    |PT     |556325.2|
+-----------+-------+--------+
only showing top 20 rows
+-------+-------------+
|country|total_revenue|
+-------+-------------+
|PT     |4778615.1    |
|ES     |4746869.9    |
|DE     |4708763.8    |
|IT     |4703156.6    |
|FR     |4617584.2    |
+-------+-------------+

+---------+---