# ETL con PySpark - Market Data

### Importar librerías y crear SparkSession

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import DoubleType, DateType
import os
os.environ["HADOOP_HOME"] = "C:\\hadoop"

spark = SparkSession.builder \
    .appName("ETL Market Data") \
    .master("local[*]") \
    .getOrCreate()

print(f"✅ SparkSession creada: {spark.sparkContext.appName}")
print(f"Versión de Spark: {spark.version}")

✅ SparkSession creada: ETL Market Data
Versión de Spark: 4.0.1


## EXTRACT: Cargar dataset original

In [2]:
df_raw = spark.read.csv("../data/market_data.csv", header=True, inferSchema=False)

# Cachear para reutilizar
df_raw.cache()

print(f"✅ Dataset cargado")
print(f"Filas: {df_raw.count()}")
print(f"Columnas: {len(df_raw.columns)}")
print(f"\nEsquema:")
df_raw.printSchema()
df_raw.show(5, truncate=False)

✅ Dataset cargado
Filas: 10000
Columnas: 15

Esquema:
root
 |-- market_type: string (nullable = true)
 |-- symbol: string (nullable = true)
 |-- name: string (nullable = true)
 |-- date: string (nullable = true)
 |-- open: string (nullable = true)
 |-- close: string (nullable = true)
 |-- high: string (nullable = true)
 |-- low: string (nullable = true)
 |-- volume: string (nullable = true)
 |-- market_cap: string (nullable = true)
 |-- sector: string (nullable = true)
 |-- exchange: string (nullable = true)
 |-- country: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- last_updated: string (nullable = true)

+-----------+------+---------+----------+------------------+------------------+------------------+-----------------+------------------+------------------+----------+--------+-------+--------+-------------------+
|market_type|symbol|name     |date      |open              |close             |high              |low              |volume            |market_cap     

In [3]:
# Mostrar las primeras 10 filas del DataFrame
print("Primeras filas:")
df_raw.show(10, truncate=False)

Primeras filas:
+-----------+------+---------+----------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------------+--------+-------+--------+-------------------+
|market_type|symbol|name     |date      |open              |close             |high              |low               |volume            |market_cap        |sector                 |exchange|country|currency|last_updated       |
+-----------+------+---------+----------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------------+--------+-------+--------+-------------------+
|stock      |MSFT  |MSFT Inc |08-03-2024|47536.20817743171 |49741.830640201704|51230.74908681415 |45530.2244971417  |1560789.2088416903|1129874955353.6138|Finance                |NASDAQ  |US     |usd     |2024-08-03 23:00:00|
|stock      |GOOGL |GOOGL Inc|2023-06-13|43310.14552728901 |44186.00670200739 |4

## EXPLORACIÓN: Análisis del dataset

### VALORES NULOS Y PROBLEMÁTICOS

In [4]:
print("=== VALORES NULOS Y PROBLEMÁTICOS ===\n")

valores_problematicos = ['?', 'N/A', 'n/a', 'NULL', 'null', 'Unknown', 'unknown', '', ' ']

for col_name in df_raw.columns:
    null_count = df_raw.filter(
        col(col_name).isNull() | col(col_name).isin(valores_problematicos)
    ).count()
    if null_count > 0:
        print(f"{col_name}: {null_count} valores problemáticos")

=== VALORES NULOS Y PROBLEMÁTICOS ===

symbol: 213 valores problemáticos
exchange: 499 valores problemáticos


### VALORES ÚNICOS EN COLUMNAS CATEGÓRICAS

In [5]:
print("=== VALORES ÚNICOS EN COLUMNAS CATEGÓRICAS ===\n")

cols_categoricas = ['market_type', 'currency', 'sector', 'exchange', 'country']

for c in cols_categoricas:
    print(f"{c.upper()}:")
    df_raw.select(c).distinct().show(truncate=False)

=== VALORES ÚNICOS EN COLUMNAS CATEGÓRICAS ===

MARKET_TYPE:
+-----------+
|market_type|
+-----------+
|stock      |
|crypto     |
+-----------+

CURRENCY:
+--------+
|currency|
+--------+
|usd     |
|USDT    |
|$       |
|USD     |
+--------+

SECTOR:
+-----------------------+
|sector                 |
+-----------------------+
|automotive             |
|finance                |
|crypto                 |
|Crypto                 |
|Finance                |
|blockchain             |
|AI                     |
|Automotive             |
|RETAIL                 |
|Technology             |
|TECHNOLOGY             |
|Artificial Intelligence|
|ai                     |
|Retail                 |
|Blockchain             |
|tech                   |
+-----------------------+

EXCHANGE:
+--------+
|exchange|
+--------+
|NYSE    |
|Unknown |
|N/A     |
|COINBASE|
|NASDAQ  |
|BINANCE |
|NULL    |
+--------+

COUNTRY:
+-------+
|country|
+-------+
|Global |
|US     |
+-------+



### FORMATOS DE FECHA

In [6]:
print("=== FORMATOS DE FECHA ===\n")
df_raw.select("date").distinct().show(20, truncate=False)

=== FORMATOS DE FECHA ===

+----------+
|date      |
+----------+
|11-27-2023|
|2024-07-14|
|30/07/2024|
|2023-05-18|
|2024-08-20|
|02/04/2024|
|2024-01-19|
|06-02-2023|
|02-28-2024|
|05-28-2023|
|2023-05-01|
|2023-01-21|
|2023-04-17|
|13/06/2023|
|2024-08-06|
|12/01/2023|
|15/12/2023|
|09-03-2023|
|07-28-2024|
|03-26-2024|
+----------+
only showing top 20 rows


### VALORES CON TEXTO EN 'open'

In [7]:
print("=== VALORES CON TEXTO EN 'open' ===\n")
df_raw.filter(col("open").rlike("[a-zA-Z]")).select("open").distinct().show(truncate=False)

=== VALORES CON TEXTO EN 'open' ===

+------------+
|open        |
+------------+
|22984.61 USD|
|2690.88 USD |
|39825.2 USD |
|37134.57 USD|
|34251.52 USD|
|8449.76 USD |
|1542.32 USD |
|20672.99 USD|
|41460.8 USD |
|25948.16 USD|
|21806.06 USD|
|21352.53 USD|
|9231.11 USD |
|4560.91 USD |
|41361.06 USD|
|29476.27 USD|
|13562.88 USD|
|20863.06 USD|
|36202.99 USD|
|9797.23 USD |
+------------+
only showing top 20 rows


### DUPLICADOS

In [24]:
print("=== DUPLICADOS ===\n")
total_filas = df_raw.count()
filas_unicas = df_raw.dropDuplicates().count()
duplicados = total_filas - filas_unicas
print(f"Filas duplicadas: {duplicados}")

=== DUPLICADOS ===



Py4JJavaError: An error occurred while calling o34.count.
: org.apache.spark.SparkException: [INTERNAL_ERROR] The "count" action failed. You hit a bug in Spark or the Spark plugins you use. Please, report this bug to the corresponding communities or vendors, and provide the full stack trace. SQLSTATE: XX000
	at org.apache.spark.SparkException$.internalError(SparkException.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$.toInternalError(QueryExecution.scala:643)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:656)
	at org.apache.spark.sql.classic.Dataset.$anonfun$withAction$1(Dataset.scala:2232)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:272)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:125)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
	at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
	at org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:106)
	at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:295)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:124)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:237)
	at org.apache.spark.sql.classic.Dataset.withAction(Dataset.scala:2232)
	at org.apache.spark.sql.classic.Dataset.count(Dataset.scala:1499)
	at jdk.internal.reflect.GeneratedMethodAccessor72.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.NullPointerException: Cannot invoke "org.apache.spark.sql.classic.SparkSession.sparkContext()" because the return value of "org.apache.spark.sql.execution.SparkPlan.session()" is null
	at org.apache.spark.sql.execution.SparkPlan.sparkContext(SparkPlan.scala:68)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.metrics$lzycompute(HashAggregateExec.scala:71)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.metrics(HashAggregateExec.scala:70)
	at org.apache.spark.sql.execution.SparkPlan.resetMetrics(SparkPlan.scala:147)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.resetMetrics(AdaptiveSparkPlanExec.scala:245)
	at org.apache.spark.sql.classic.Dataset.$anonfun$withAction$2(Dataset.scala:2233)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:654)
	... 27 more


### VALORES ÚNICOS POR COLUMNA

In [9]:
print("=== VALORES ÚNICOS POR COLUMNA ===\n")

for col_name in df_raw.columns:
    unique_count = df_raw.select(col_name).distinct().count()
    print(f"{col_name}: {unique_count} valores únicos")

=== VALORES ÚNICOS POR COLUMNA ===

market_type: 2 valores únicos
symbol: 11 valores únicos
name: 10 valores únicos
date: 1798 valores únicos
open: 10000 valores únicos
close: 10000 valores únicos
high: 10000 valores únicos
low: 10000 valores únicos
volume: 10000 valores únicos
market_cap: 10000 valores únicos
sector: 16 valores únicos
exchange: 7 valores únicos
country: 2 valores únicos
currency: 4 valores únicos
last_updated: 7259 valores únicos


### ESPACIOS EN BLANCO

In [10]:
print("=== ESPACIOS EN BLANCO ===\n")

for col_name in df_raw.columns:
    con_espacios = df_raw.filter(trim(col(col_name)) != col(col_name)).count()
    if con_espacios >= 0:
        print(f"{col_name}: {con_espacios} valores con espacios al inicio/final")

=== ESPACIOS EN BLANCO ===

market_type: 0 valores con espacios al inicio/final
symbol: 0 valores con espacios al inicio/final
name: 0 valores con espacios al inicio/final
date: 0 valores con espacios al inicio/final
open: 0 valores con espacios al inicio/final
close: 0 valores con espacios al inicio/final
high: 0 valores con espacios al inicio/final
low: 0 valores con espacios al inicio/final
volume: 0 valores con espacios al inicio/final
market_cap: 0 valores con espacios al inicio/final
sector: 0 valores con espacios al inicio/final
exchange: 0 valores con espacios al inicio/final
country: 0 valores con espacios al inicio/final
currency: 0 valores con espacios al inicio/final
last_updated: 0 valores con espacios al inicio/final


## TRANSFORM: Limpieza de datos

### Paso 1: Crear copia de trabajo

In [11]:
df_clean = df_raw
print(f"✅ DataFrame listo para transformar: {df_clean.count()} filas")

✅ DataFrame listo para transformar: 10000 filas


### Paso 2: Limpiar columna 'open' (extraer números)

In [12]:
from pyspark.sql.functions import regexp_extract

print("Antes:")
df_clean.select("open").show(5, truncate=False)

# Extraer solo números (patrón: dígitos con punto decimal opcional)
df_clean = df_clean.withColumn(
    "open",
    regexp_extract(col("open"), r"([\d.]+)", 1).cast("double")
)

print("\nDespués:")
df_clean.select("open").show(5, truncate=False)
print("✅ 'open' limpiado")

Antes:
+------------------+
|open              |
+------------------+
|47536.20817743171 |
|43310.14552728901 |
|10624.832142807025|
|14568.544718500116|
|39260.94631003675 |
+------------------+
only showing top 5 rows

Después:
+------------------+
|open              |
+------------------+
|47536.20817743171 |
|43310.14552728901 |
|10624.832142807025|
|14568.544718500116|
|39260.94631003675 |
+------------------+
only showing top 5 rows
✅ 'open' limpiado


### Paso 3: Normalizar columna 'currency'

In [13]:
print("Antes:")
df_clean.select("currency").distinct().show()

df_clean = df_clean.withColumn(
    "currency",
    when(upper(col("currency")).isin(["USD", "USDT", "$"]), "USD")
    .when(upper(col("currency")) == "USD", "USD")
    .otherwise(upper(col("currency")))
)

print("Después:")
df_clean.select("currency").distinct().show()
print("✅ 'currency' normalizado")

Antes:
+--------+
|currency|
+--------+
|     usd|
|    USDT|
|       $|
|     USD|
+--------+

Después:
+--------+
|currency|
+--------+
|     USD|
+--------+

✅ 'currency' normalizado


### Paso 4: Normalizar columna 'sector'

In [14]:
print("Antes:")
df_clean.select("sector").distinct().show(20, truncate=False)

df_clean = df_clean.withColumn(
    "sector",
    when(lower(col("sector")).isin(["finance"]), "Finance")
    .when(lower(col("sector")).isin(["tech", "technology"]), "Technology")
    .when(lower(col("sector")).isin(["retail"]), "Retail")
    .when(lower(col("sector")).isin(["crypto", "blockchain"]), "Crypto")
    .when(lower(col("sector")).isin(["ai", "artificial intelligence"]), "AI")
    .when(lower(col("sector")).isin(["automotive"]), "Automotive")
    .otherwise(col("sector"))
)

print("Después:")
df_clean.select("sector").distinct().show()
print("✅ 'sector' normalizado")

Antes:
+-----------------------+
|sector                 |
+-----------------------+
|automotive             |
|finance                |
|crypto                 |
|Crypto                 |
|Finance                |
|blockchain             |
|AI                     |
|Automotive             |
|RETAIL                 |
|Technology             |
|TECHNOLOGY             |
|Artificial Intelligence|
|ai                     |
|Retail                 |
|Blockchain             |
|tech                   |
+-----------------------+

Después:
+----------+
|    sector|
+----------+
|    Crypto|
|   Finance|
|        AI|
|Automotive|
|Technology|
|    Retail|
+----------+

✅ 'sector' normalizado


### Paso 5: Limpiar columna 'exchange'

In [15]:
print("Antes:")
df_clean.select("exchange").distinct().show()

df_clean = df_clean.withColumn(
    "exchange",
    when(
        col("exchange").isNull() | 
        (col("exchange") == "Unknown") | 
        (col("exchange") == "N/A") | 
        (col("exchange") == "NULL"),
        "exchange_empty"
    ).otherwise(col("exchange"))
)

print("\nDespués:")
df_clean.select("exchange").distinct().show()
print("✅ 'exchange' limpiado")

Antes:
+--------+
|exchange|
+--------+
|    NYSE|
| Unknown|
|     N/A|
|COINBASE|
|  NASDAQ|
| BINANCE|
|    NULL|
+--------+


Después:
+--------------+
|      exchange|
+--------------+
|          NYSE|
|exchange_empty|
|      COINBASE|
|        NASDAQ|
|       BINANCE|
+--------------+

✅ 'exchange' limpiado


### Paso 6: Limpiar columna 'symbol' (rellenar según name)

In [16]:
from pyspark.sql.functions import first

print(f"Nulos en 'symbol': {df_clean.filter(col('symbol').isNull()).count()}")

# Crear mapeo name → symbol (de filas que sí tienen symbol)
name_symbol_map = df_clean.filter(col("symbol").isNotNull()) \
    .groupBy("name") \
    .agg(first("symbol").alias("symbol_map"))

# Join para rellenar nulos
df_clean = df_clean.join(name_symbol_map, on="name", how="left")

df_clean = df_clean.withColumn(
    "symbol",
    when(col("symbol").isNull(), col("symbol_map"))
    .otherwise(col("symbol"))
).drop("symbol_map")

# Si aún quedan nulos, rellenar con 'symbol_empty'
df_clean = df_clean.withColumn(
    "symbol",
    when(col("symbol").isNull(), "symbol_empty")
    .otherwise(col("symbol"))
)

print(f"Nulos después: {df_clean.filter(col('symbol').isNull()).count()}")
print(f"'symbol_empty' restantes: {df_clean.filter(col('symbol') == 'symbol_empty').count()}")
print("✅ 'symbol' limpiado")

Nulos en 'symbol': 213
Nulos después: 0
'symbol_empty' restantes: 0
✅ 'symbol' limpiado


### Paso 7: Normalizar columna 'date'

In [17]:
df_clean = df_clean.withColumn(
    "date",
    when(
        col("date").rlike(r"^\d{4}-\d{2}-\d{2}$"),  # YYYY-MM-DD
        make_date(
            split(col("date"), "-")[0].cast("int"),
            split(col("date"), "-")[1].cast("int"),
            split(col("date"), "-")[2].cast("int")
        )
    ).when(
        col("date").rlike(r"^\d{2}/\d{2}/\d{4}$"),  # DD/MM/YYYY
        make_date(
            split(col("date"), "/")[2].cast("int"),
            split(col("date"), "/")[1].cast("int"),
            split(col("date"), "/")[0].cast("int")
        )
    ).when(
        col("date").rlike(r"^\d{2}-\d{2}-\d{4}$"),  # MM-DD-YYYY
        make_date(
            split(col("date"), "-")[2].cast("int"),
            split(col("date"), "-")[0].cast("int"),
            split(col("date"), "-")[1].cast("int")
        )
    ).otherwise(None)
)

df_clean.select("date").show(10)
print(f"Nulos: {df_clean.filter(col('date').isNull()).count()}")

+----------+
|      date|
+----------+
|2024-08-03|
|2023-06-13|
|2024-08-13|
|2024-08-15|
|2024-03-29|
|2024-03-13|
|2023-10-27|
|2023-08-24|
|2023-07-22|
|2023-04-25|
+----------+
only showing top 10 rows
Nulos: 0


### Paso 8: Convertir 'last_updated' a timestamp

In [18]:
print(f"Tipo antes: {df_clean.schema['last_updated'].dataType}")

df_clean = df_clean.withColumn(
    "last_updated",
    to_timestamp(col("last_updated"), "yyyy-MM-dd HH:mm:ss")
)

print(f"Tipo después: {df_clean.schema['last_updated'].dataType}")
print("✅ 'last_updated' convertido")

Tipo antes: StringType()
Tipo después: TimestampType()
✅ 'last_updated' convertido


### Paso 9: Crear columnas derivadas

In [19]:
df_clean = df_clean.withColumn(
    "daily_change",
    col("close") - col("open")
)

df_clean = df_clean.withColumn(
    "daily_change_pct",
    ((col("close") - col("open")) / col("open")) * 100
)

print("✅ Columnas derivadas creadas")
df_clean.select("open", "close", "daily_change", "daily_change_pct").show(5)

✅ Columnas derivadas creadas
+------------------+------------------+-------------------+------------------+
|              open|             close|       daily_change|  daily_change_pct|
+------------------+------------------+-------------------+------------------+
| 47536.20817743171|49741.830640201704|  2205.622462769992| 4.639878836228114|
| 43310.14552728901| 44186.00670200739|  875.8611747183822|2.0223002348641765|
|10624.832142807025|  9948.72087971569|  -676.111263091334|-6.363500655857974|
|14568.544718500116| 14894.45149823145|  325.9067797313346|2.2370578944475925|
| 39260.94631003675|36902.728007201345|-2358.2183028354048|-6.006524356832797|
+------------------+------------------+-------------------+------------------+
only showing top 5 rows


### Paso 10: Reducir decimales a 2 dígitos

In [20]:
cols_decimales = ['open', 'close', 'high', 'low', 'volume', 'market_cap', 'daily_change', 'daily_change_pct']

for c in cols_decimales:
    df_clean = df_clean.withColumn(c, round(col(c), 2))

print("✅ Decimales reducidos a 2 dígitos")
df_clean.select(cols_decimales).show(5)

✅ Decimales reducidos a 2 dígitos
+--------+--------+--------+--------+----------+-------------------+------------+----------------+
|    open|   close|    high|     low|    volume|         market_cap|daily_change|daily_change_pct|
+--------+--------+--------+--------+----------+-------------------+------------+----------------+
|47536.21|49741.83|51230.75|45530.22|1560789.21|1.12987495535361E12|     2205.62|            4.64|
|43310.15|44186.01|45750.35|41189.21|9699128.61| 1.8367000038292E11|      875.86|            2.02|
|10624.83| 9948.72|10722.26| 9602.63|5248039.56| 3.3381484311617E11|     -676.11|           -6.36|
|14568.54|14894.45|14998.34|14052.92|3664252.07| 1.7561828236364E11|      325.91|            2.24|
|39260.95|36902.73|40270.41|36150.68| 465457.68| 1.8514764376573E11|    -2358.22|           -6.01|
+--------+--------+--------+--------+----------+-------------------+------------+----------------+
only showing top 5 rows


# LOAD

### Guardar CSV 

In [None]:
# Uso pandas para guardar porque no he podido con Spark directamente
df_clean.toPandas().to_csv("../data/market_data_clean_spark.csv", index=False)
print("✅ CSV guardado en ../data/market_data_clean_spark.csv")

✅ CSV guardado en ../data/market_data_clean_spark.csv


### Limpiar sesion


In [23]:
df_clean.unpersist()
spark.stop()
print("✅ Sesión cerrada")

✅ Sesión cerrada


## Iniciar sesion y cargar CSV limpio

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("ETL Market Data").master("local[*]").getOrCreate()

df = spark.read.csv("../data/market_data_clean_spark.csv", header=True, inferSchema=True)
df.cache()
df.show(5)
print(f"Filas: {df.count()}, Columnas: {len(df.columns)}")

+---------+-----------+------+----------+--------+--------+--------+--------+----------+-------------------+-------+--------+-------+--------+-------------------+------------+----------------+
|     name|market_type|symbol|      date|    open|   close|    high|     low|    volume|         market_cap| sector|exchange|country|currency|       last_updated|daily_change|daily_change_pct|
+---------+-----------+------+----------+--------+--------+--------+--------+----------+-------------------+-------+--------+-------+--------+-------------------+------------+----------------+
| MSFT Inc|      stock|  MSFT|2024-08-03|47536.21|49741.83|51230.75|45530.22|1560789.21|1.12987495535361E12|Finance|  NASDAQ|     US|     USD|2024-08-03 23:00:00|     2205.62|            4.64|
|GOOGL Inc|      stock| GOOGL|2023-06-13|43310.15|44186.01|45750.35|41189.21|9699128.61| 1.8367000038292E11| Retail|  NASDAQ| Global|     USD|2023-06-13 12:00:00|      875.86|            2.02|
| BTC Coin|     crypto|   BTC|2024-

## Crear tablas dimensionales

In [2]:
# 1. dim_asset
dim_asset = df.select("symbol", "name", "market_type").dropDuplicates()
dim_asset = dim_asset.withColumn("asset_id", monotonically_increasing_id() + 1)
dim_asset.show()

# 2. dim_sector
dim_sector = df.select("sector").dropDuplicates().withColumnRenamed("sector", "sector_name")
dim_sector = dim_sector.withColumn("sector_id", monotonically_increasing_id() + 1)
dim_sector.show()

# 3. dim_exchange
dim_exchange = df.select("exchange").dropDuplicates().withColumnRenamed("exchange", "exchange_name")
dim_exchange = dim_exchange.withColumn("exchange_id", monotonically_increasing_id() + 1)
dim_exchange.show()

# 4. dim_currency
dim_currency = df.select("currency").dropDuplicates().withColumnRenamed("currency", "currency_name")
dim_currency = dim_currency.withColumn("currency_id", monotonically_increasing_id() + 1)
dim_currency.show()

# 5. dim_country
dim_country = df.select("country").dropDuplicates().withColumnRenamed("country", "country_name")
dim_country = dim_country.withColumn("country_id", monotonically_increasing_id() + 1)
dim_country.show()

# 6. dim_date
dim_date = df.select("date").dropDuplicates()
dim_date = dim_date.withColumn("date_id", monotonically_increasing_id() + 1) \
    .withColumn("year", year("date")) \
    .withColumn("month", month("date")) \
    .withColumn("day", dayofmonth("date")) \
    .withColumn("weekday", dayofweek("date"))
dim_date.show()

print("✅ Dimensiones creadas")

+------+---------+-----------+--------+
|symbol|     name|market_type|asset_id|
+------+---------+-----------+--------+
|  AAPL| AAPL Inc|      stock|       1|
|   ETH| ETH Coin|     crypto|       2|
|  MSFT| MSFT Inc|      stock|       3|
|  AMZN| AMZN Inc|      stock|       4|
| GOOGL|GOOGL Inc|      stock|       5|
|   SOL| SOL Coin|     crypto|       6|
|   ADA| ADA Coin|     crypto|       7|
|   BTC| BTC Coin|     crypto|       8|
|   BNB| BNB Coin|     crypto|       9|
|  TSLA| TSLA Inc|      stock|      10|
+------+---------+-----------+--------+

+-----------+---------+
|sector_name|sector_id|
+-----------+---------+
|     Crypto|        1|
|    Finance|        2|
|         AI|        3|
| Automotive|        4|
| Technology|        5|
|     Retail|        6|
+-----------+---------+

+--------------+-----------+
| exchange_name|exchange_id|
+--------------+-----------+
|          NYSE|          1|
|exchange_empty|          2|
|      COINBASE|          3|
|        NASDAQ|        

## Crear tabla de hechos

In [3]:
# Crear fact_market_data con joins a las dimensiones
fact = df.alias("f") \
    .join(dim_asset.alias("a"), (col("f.symbol") == col("a.symbol")) & (col("f.name") == col("a.name")), "left") \
    .join(dim_sector.alias("s"), col("f.sector") == col("s.sector_name"), "left") \
    .join(dim_exchange.alias("e"), col("f.exchange") == col("e.exchange_name"), "left") \
    .join(dim_currency.alias("c"), col("f.currency") == col("c.currency_name"), "left") \
    .join(dim_country.alias("co"), col("f.country") == col("co.country_name"), "left") \
    .join(dim_date.alias("d"), col("f.date") == col("d.date"), "left") \
    .select(
        col("a.asset_id"),
        col("d.date_id"),
        col("s.sector_id"),
        col("e.exchange_id"),
        col("c.currency_id"),
        col("co.country_id"),
        col("f.open"),
        col("f.close"),
        col("f.high"),
        col("f.low"),
        col("f.volume"),
        col("f.market_cap"),
        col("f.daily_change"),
        col("f.daily_change_pct")
    )

fact.show(5)
print(f"✅ Fact table creada: {fact.count()} filas")

+--------+-------+---------+-----------+-----------+----------+--------+--------+--------+--------+----------+-------------------+------------+----------------+
|asset_id|date_id|sector_id|exchange_id|currency_id|country_id|    open|   close|    high|     low|    volume|         market_cap|daily_change|daily_change_pct|
+--------+-------+---------+-----------+-----------+----------+--------+--------+--------+--------+----------+-------------------+------------+----------------+
|       3|    107|        2|          4|          1|         2|47536.21|49741.83|51230.75|45530.22|1560789.21|1.12987495535361E12|     2205.62|            4.64|
|       5|    574|        6|          4|          1|         1|43310.15|44186.01|45750.35|41189.21|9699128.61| 1.8367000038292E11|      875.86|            2.02|
|       8|    330|        1|          3|          1|         2|10624.83| 9948.72|10722.26| 9602.63|5248039.56| 3.3381484311617E11|     -676.11|           -6.36|
|       6|    108|        1|      

## Guardar en SQLite

In [4]:
from sqlalchemy import create_engine

# Convertir a Pandas
dim_asset_pd = dim_asset.toPandas()
dim_sector_pd = dim_sector.toPandas()
dim_exchange_pd = dim_exchange.toPandas()
dim_currency_pd = dim_currency.toPandas()
dim_country_pd = dim_country.toPandas()
dim_date_pd = dim_date.toPandas()
fact_pd = fact.toPandas()

# Conexión SQLite
engine = create_engine('sqlite:///../warehouse/warehouse_pyspark.db')

# Guardar tablas
dim_asset_pd.to_sql('dim_asset', engine, if_exists='replace', index=False)
dim_sector_pd.to_sql('dim_sector', engine, if_exists='replace', index=False)
dim_exchange_pd.to_sql('dim_exchange', engine, if_exists='replace', index=False)
dim_currency_pd.to_sql('dim_currency', engine, if_exists='replace', index=False)
dim_country_pd.to_sql('dim_country', engine, if_exists='replace', index=False)
dim_date_pd.to_sql('dim_date', engine, if_exists='replace', index=False)
fact_pd.to_sql('fact_market_data', engine, if_exists='replace', index=False)

print("✅ Warehouse guardado en ../warehouse/warehouse_pyspark.db")
print(f"   • 6 dimensiones")
print(f"   • 1 fact table: {len(fact_pd)} filas")

✅ Warehouse guardado en ../warehouse/warehouse_pyspark.db
   • 6 dimensiones
   • 1 fact table: 10000 filas


## Generar DDL

In [5]:
def spark_dtype_to_sql(dtype):
    dtype_str = str(dtype).lower()
    if 'int' in dtype_str or 'long' in dtype_str:
        return "INTEGER"
    elif 'double' in dtype_str or 'float' in dtype_str:
        return "REAL"
    elif 'date' in dtype_str or 'timestamp' in dtype_str:
        return "DATE"
    elif 'boolean' in dtype_str:
        return "BOOLEAN"
    else:
        return "TEXT"

def generate_ddl(df, table_name, pk=None, fks=None):
    lines = [f"CREATE TABLE {table_name} ("]
    cols = []
    for field in df.schema.fields:
        col_def = f"    {field.name} {spark_dtype_to_sql(field.dataType)}"
        if pk and field.name == pk:
            col_def += " PRIMARY KEY"
        cols.append(col_def)
    if fks:
        for fk_col, ref_table, ref_col in fks:
            cols.append(f"    FOREIGN KEY ({fk_col}) REFERENCES {ref_table}({ref_col})")
    lines.append(",\n".join(cols))
    lines.append(");")
    return "\n".join(lines)

# Generar DDLs
ddl_parts = ["-- DDL Datawarehouse Market Data (PySpark)\n"]
ddl_parts.append(generate_ddl(dim_asset, "dim_asset", pk="asset_id"))
ddl_parts.append(generate_ddl(dim_sector, "dim_sector", pk="sector_id"))
ddl_parts.append(generate_ddl(dim_exchange, "dim_exchange", pk="exchange_id"))
ddl_parts.append(generate_ddl(dim_currency, "dim_currency", pk="currency_id"))
ddl_parts.append(generate_ddl(dim_country, "dim_country", pk="country_id"))
ddl_parts.append(generate_ddl(dim_date, "dim_date", pk="date_id"))
ddl_parts.append(generate_ddl(fact, "fact_market_data", fks=[
    ("asset_id", "dim_asset", "asset_id"),
    ("date_id", "dim_date", "date_id"),
    ("sector_id", "dim_sector", "sector_id"),
    ("exchange_id", "dim_exchange", "exchange_id"),
    ("currency_id", "dim_currency", "currency_id"),
    ("country_id", "dim_country", "country_id")
]))

ddl_sql = "\n\n".join(ddl_parts)

with open("../warehouse/modelo_datawarehouse_pyspark.sql", "w") as f:
    f.write(ddl_sql)

print("✅ DDL guardado")
print(ddl_sql)

✅ DDL guardado
-- DDL Datawarehouse Market Data (PySpark)


CREATE TABLE dim_asset (
    symbol TEXT,
    name TEXT,
    market_type TEXT,
    asset_id INTEGER PRIMARY KEY
);

CREATE TABLE dim_sector (
    sector_name TEXT,
    sector_id INTEGER PRIMARY KEY
);

CREATE TABLE dim_exchange (
    exchange_name TEXT,
    exchange_id INTEGER PRIMARY KEY
);

CREATE TABLE dim_currency (
    currency_name TEXT,
    currency_id INTEGER PRIMARY KEY
);

CREATE TABLE dim_country (
    country_name TEXT,
    country_id INTEGER PRIMARY KEY
);

CREATE TABLE dim_date (
    date DATE,
    date_id INTEGER PRIMARY KEY,
    year INTEGER,
    month INTEGER,
    day INTEGER,
    weekday INTEGER
);

CREATE TABLE fact_market_data (
    asset_id INTEGER,
    date_id INTEGER,
    sector_id INTEGER,
    exchange_id INTEGER,
    currency_id INTEGER,
    country_id INTEGER,
    open REAL,
    close REAL,
    high REAL,
    low REAL,
    volume REAL,
    market_cap REAL,
    daily_change REAL,
    daily_change_pct R