# Proyecto RA1: Analisis de Proyectos de Arquitectura
## Flujo 2: PySpark - Procesamiento y ETL

Este notebook implementa:
- **Fase 2**: Procesamiento con PySpark
- **Fase 4**: Proceso ETL con PySpark
- **Fase 5**: Modelo de Data Warehouse

## 1. Configuracion Inicial

In [1]:
import os
import sqlite3
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql.functions import *


# ===============================
# Rutas
# ===============================
DATA_PATH = "/app/data/proyectos_arquitectura.csv"
WAREHOUSE_PATH = "/app/warehouse/warehouse_pyspark.db"

print("Rutas:")
print(DATA_PATH)
print(WAREHOUSE_PATH)

Rutas:
/app/data/proyectos_arquitectura.csv
/app/warehouse/warehouse_pyspark.db


In [2]:
spark = (
    SparkSession.builder
    .appName("Proyecto Arquitectura - ETL PySpark")
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY")
    .config("spark.driver.memory", "2g")
    .config("spark.sql.warehouse.dir", "/app/warehouse")
    .getOrCreate()
)


print("Spark OK")
print("Version:", spark.version)
print("Master:", spark.sparkContext.master)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/17 15:50:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark OK
Version: 4.0.1
Master: local[*]


## 2. EXTRACCION - Carga del Dataset

In [3]:
# Cargar el dataset original
df_raw = spark.read.csv(
    DATA_PATH,
    header=True,
    inferSchema=True
)

print(f'Dataset cargado: {df_raw.count()} filas x {len(df_raw.columns)} columnas')
df_raw.printSchema()

Dataset cargado: 10000 filas x 10 columnas
root
 |-- id_proyecto: double (nullable = true)
 |-- nombre_proyecto: string (nullable = true)
 |-- colaborador: string (nullable = true)
 |-- fase: string (nullable = true)
 |-- fecha_inicio: string (nullable = true)
 |-- total_base: string (nullable = true)
 |-- iva_porcentaje: string (nullable = true)
 |-- total_con_iva: double (nullable = true)
 |-- estado: string (nullable = true)
 |-- ciudad: string (nullable = true)



In [4]:
# Vista previa de los datos
df_raw.show(10, truncate=False)

+-----------+-------------------------+---------------+-------------+------------+----------+--------------+-------------+----------+---------+
|id_proyecto|nombre_proyecto          |colaborador    |fase         |fecha_inicio|total_base|iva_porcentaje|total_con_iva|estado    |ciudad   |
+-----------+-------------------------+---------------+-------------+------------+----------+--------------+-------------+----------+---------+
|1.0        |Proyecto Edificio 539    |Pedro Ortega   |Presupuesto  |04-26-2024  |78035.32  |0.21          |93642.38     |Finalizado|bcn      |
|2.0        |Proyecto Residencial 103 |Marta López    |Presupuesto  |21/06/2023  |190389.29 |21%           |230371.04    |Pendiente |Valencia |
|3.0        |Proyecto Reforma 4921    |Pedro Ortega   |Presupuesto  |2022-12-27  |147738.82 |0.21          |178763.97    |En curso  |Bilbao   |
|4.0        |Proyecto Reforma 2950    |Carlos Ruiz    |Presupuesto  |2022-08-16  |121738,4  |0.10          |133912.24    |Pendiente |mad

## 3. EXPLORACION Y ANALISIS

In [5]:
# Distribucion por estado
print('Distribucion por estado:')
df_raw.groupBy('estado').count().orderBy('count', ascending=False).show()

Distribucion por estado:
+----------+-----+
|    estado|count|
+----------+-----+
| Cancelado| 2516|
| Pendiente| 2506|
|  En curso| 2502|
|Finalizado| 2476|
+----------+-----+



In [6]:
# Distribucion por ciudad (antes de normalizar)
print('Distribucion por ciudad (antes de normalizar):')
df_raw.groupBy('ciudad').count().orderBy('count', ascending=False).show()

Distribucion por ciudad (antes de normalizar):
+---------+-----+
|   ciudad|count|
+---------+-----+
|barcelona| 1056|
|   Madrid| 1034|
|      BCN| 1020|
|Barcelona| 1004|
|   Bilbao|  999|
|      MAD|  990|
|   madrid|  989|
|      bcn|  983|
| Valencia|  972|
|      VAL|  953|
+---------+-----+



In [7]:
# Distribucion por fase
print('Distribucion por fase:')
df_raw.groupBy('fase').count().orderBy('count', ascending=False).show()

Distribucion por fase:
+-------------+-----+
|         fase|count|
+-------------+-----+
|  Presupuesto| 2546|
|       Diseño| 2508|
|Planificación| 2498|
|         Obra| 2448|
+-------------+-----+



## 4. TRANSFORMACION - Limpieza y Procesamiento

Se aplican 8 transformaciones usando Spark DataFrame API

In [8]:
# Crear copia para transformaciones
df = df_raw
print(f'Registros iniciales: {df.count()}')

Registros iniciales: 10000


### 4.1 Transformacion 1: Normalizacion de Colaboradores

In [9]:
# Normalizar nombres de colaboradores
# - Reemplazar puntos por espacios
# - Convertir a formato titulo
# - Eliminar espacios extras

df = df.withColumn(
    'colaborador_limpio',
    initcap(trim(regexp_replace(col('colaborador'), r'\\.', ' ')))
)

print('Transformacion 1: Colaboradores normalizados')
df.select('colaborador', 'colaborador_limpio').distinct().show(15, truncate=False)

Transformacion 1: Colaboradores normalizados


[Stage 18:>                                                         (0 + 1) / 1]

+---------------+------------------+
|colaborador    |colaborador_limpio|
+---------------+------------------+
|ANA MORALES    |Ana Morales       |
|JOSÉ PÉREZ     |José Pérez        |
|Lucía.Fernández|Lucía.fernández   |
|luis delgado   |Luis Delgado      |
|Elena.Rivas    |Elena.rivas       |
|Marta.López    |Marta.lópez       |
|juan gómez     |Juan Gómez        |
|CARLOS RUIZ    |Carlos Ruiz       |
|LUCÍA FERNÁNDEZ|Lucía Fernández   |
|pedro ortega   |Pedro Ortega      |
|ana morales    |Ana Morales       |
|Carlos.Ruiz    |Carlos.ruiz       |
|ELENA RIVAS    |Elena Rivas       |
|Ana.Morales    |Ana.morales       |
|lucía fernández|Lucía Fernández   |
+---------------+------------------+
only showing top 15 rows


                                                                                

### 4.2 Transformacion 2: Normalizacion de Ciudades

In [10]:
# Normalizar ciudades usando when/otherwise
df = df.withColumn(
    'ciudad_limpia',
    when(lower(col('ciudad')).isin('bcn', 'barcelona'), 'Barcelona')
    .when(lower(col('ciudad')).isin('mad', 'madrid'), 'Madrid')
    .when(lower(col('ciudad')).isin('val', 'valencia'), 'Valencia')
    .when(lower(col('ciudad')) == 'bilbao', 'Bilbao')
    .otherwise(initcap(col('ciudad')))
)

print('Transformacion 2: Ciudades normalizadas')
df.groupBy('ciudad_limpia').count().orderBy('count', ascending=False).show()

Transformacion 2: Ciudades normalizadas
+-------------+-----+
|ciudad_limpia|count|
+-------------+-----+
|    Barcelona| 4063|
|       Madrid| 3013|
|     Valencia| 1925|
|       Bilbao|  999|
+-------------+-----+



### 4.3 Transformacion 3: Limpieza de Valores Monetarios

In [11]:
# Limpiar valores monetarios
# Eliminar simbolos de euro y convertir comas a puntos

df = df.withColumn(
    'total_base_limpio',
    regexp_replace(
        regexp_replace(
            regexp_replace(col('total_base').cast('string'), '[^0-9.,]', ''),
            ',', '.'
        ),
        ' ', ''
    ).cast('double')
)

df = df.withColumn(
    'total_con_iva_limpio',
    regexp_replace(
        regexp_replace(
            regexp_replace(col('total_con_iva').cast('string'), '[^0-9.,]', ''),
            ',', '.'
        ),
        ' ', ''
    ).cast('double')
)

print('Transformacion 3: Valores monetarios limpiados')
df.select('total_base', 'total_base_limpio').show(10)

Transformacion 3: Valores monetarios limpiados
+----------+-----------------+
|total_base|total_base_limpio|
+----------+-----------------+
|  78035.32|         78035.32|
| 190389.29|        190389.29|
| 147738.82|        147738.82|
|  121738,4|         121738.4|
| 35423.63€|         35423.63|
|  35418,93|         35418.93|
|   16326.3|          16326.3|
|173904.35€|        173904.35|
| 122217,43|        122217.43|
| 143074.15|        143074.15|
+----------+-----------------+
only showing top 10 rows


### 4.4 Transformacion 4: Normalizacion de IVA

In [12]:
# Normalizar porcentaje de IVA a decimal
df = df.withColumn(
    'iva_decimal',
    when(
        regexp_replace(col('iva_porcentaje').cast('string'), '%', '').cast('double') > 1,
        regexp_replace(col('iva_porcentaje').cast('string'), '%', '').cast('double') / 100
    ).otherwise(
        regexp_replace(col('iva_porcentaje').cast('string'), '%', '').cast('double')
    )
)

print('Transformacion 4: IVA normalizado')
df.select('iva_porcentaje', 'iva_decimal').distinct().show()

Transformacion 4: IVA normalizado
+--------------+-----------+
|iva_porcentaje|iva_decimal|
+--------------+-----------+
|           10%|        0.1|
|           21%|       0.21|
|          0.21|       0.21|
|            21|       0.21|
|            10|        0.1|
|          0.10|        0.1|
+--------------+-----------+



### 4.5 Transformacion 5: Extraccion de Tipo de Proyecto

In [13]:
# Extraer tipo de proyecto del nombre
df = df.withColumn(
    'tipo_proyecto',
    when(lower(col('nombre_proyecto')).contains('edificio'), 'Edificio')
    .when(lower(col('nombre_proyecto')).contains('residencial'), 'Residencial')
    .when(lower(col('nombre_proyecto')).contains('reforma'), 'Reforma')
    .when(lower(col('nombre_proyecto')).contains('oficina'), 'Oficina')
    .when(lower(col('nombre_proyecto')).contains('local'), 'Local')
    .otherwise('Otro')
)

print('Transformacion 5: Tipo de proyecto extraido')
df.groupBy('tipo_proyecto').count().orderBy('count', ascending=False).show()

Transformacion 5: Tipo de proyecto extraido
+-------------+-----+
|tipo_proyecto|count|
+-------------+-----+
|     Edificio| 2091|
|      Reforma| 2008|
|        Local| 2003|
|      Oficina| 1957|
|  Residencial| 1941|
+-------------+-----+



### 4.6 Transformacion 6: Categorizacion por Tamano

In [14]:
# Categorizar por tamano de proyecto basado en total_base
df = df.withColumn(
    'tamano_proyecto',
    when(col('total_base_limpio') < 50000, 'Pequeno')
    .when(col('total_base_limpio') < 100000, 'Mediano')
    .when(col('total_base_limpio') < 150000, 'Grande')
    .otherwise('Muy Grande')
)

print('Transformacion 6: Categorizacion por tamano')
df.groupBy('tamano_proyecto').count().orderBy('count', ascending=False).show()

Transformacion 6: Categorizacion por tamano
+---------------+-----+
|tamano_proyecto|count|
+---------------+-----+
|        Mediano| 2603|
|         Grande| 2589|
|     Muy Grande| 2464|
|        Pequeno| 2344|
+---------------+-----+



### 4.7 Transformacion 7: Calculo de Metricas

In [15]:
# Recalcular total_con_iva donde sea nulo
df = df.withColumn(
    'total_con_iva_final',
    coalesce(
        col('total_con_iva_limpio'),
        col('total_base_limpio') * (1 + col('iva_decimal'))
    )
)

# Calcular importe de IVA
df = df.withColumn(
    'importe_iva',
    col('total_con_iva_final') - col('total_base_limpio')
)

print('Transformacion 7: Metricas calculadas')
df.select('total_base_limpio', 'iva_decimal', 'total_con_iva_final', 'importe_iva').show(10)

Transformacion 7: Metricas calculadas
+-----------------+-----------+-------------------+------------------+
|total_base_limpio|iva_decimal|total_con_iva_final|       importe_iva|
+-----------------+-----------+-------------------+------------------+
|         78035.32|       0.21|           93642.38|15607.059999999998|
|        190389.29|       0.21|          230371.04|          39981.75|
|        147738.82|       0.21|          178763.97|31025.149999999994|
|         121738.4|        0.1|          133912.24|12173.839999999997|
|         35423.63|        0.1|           38965.99|3542.3600000000006|
|         35418.93|       0.21|           42856.91| 7437.980000000003|
|          16326.3|       0.21|           19754.82|3428.5200000000004|
|        173904.35|        0.1|          191294.79|17390.440000000002|
|        122217.43|        0.1|          134439.17| 12221.74000000002|
|        143074.15|        0.1|          157381.57|14307.420000000013|
+-----------------+-----------+--------

### 4.8 Transformacion 8: Tratamiento de IDs Nulos

In [16]:
# Obtener maximo ID actual
max_id_val = df.agg(max('id_proyecto')).collect()[0][0]
print(f'ID maximo actual: {max_id_val}')

# Crear ID numerico para todas las filas
df = df.withColumn('row_num', monotonically_increasing_id())

# Asignar ID donde sea nulo
df = df.withColumn(
    'id_proyecto_final',
    coalesce(
        col('id_proyecto').cast('int'),
        (col('row_num') + max_id_val + 1).cast('int')
    )
)

print('Transformacion 8: IDs asignados')
print(f'   IDs nulos originales: {df.filter(col("id_proyecto").isNull()).count()}')

ID maximo actual: 10000.0
Transformacion 8: IDs asignados
   IDs nulos originales: 226


## 5. AGREGACIONES CON SPARK

In [17]:
# Agregacion 1: Resumen por ciudad
print('Resumen por ciudad:')
df.groupBy('ciudad_limpia') \
    .agg(
        count('*').alias('num_proyectos'),
        round(sum('total_base_limpio'), 2).alias('total_facturado'),
        round(avg('total_base_limpio'), 2).alias('promedio_proyecto')
    ) \
    .orderBy('total_facturado', ascending=False) \
    .show()

Resumen por ciudad:
+-------------+-------------+---------------+-----------------+
|ciudad_limpia|num_proyectos|total_facturado|promedio_proyecto|
+-------------+-------------+---------------+-----------------+
|    Barcelona|         4063| 4.0817926777E8|        100462.53|
|       Madrid|         3013| 3.0723507468E8|        101969.82|
|     Valencia|         1925| 1.9819505773E8|        102958.47|
|       Bilbao|          999| 1.0000173735E8|        100101.84|
+-------------+-------------+---------------+-----------------+



In [18]:
# Agregacion 2: Resumen por tipo de proyecto
print('Resumen por tipo de proyecto:')
df.groupBy('tipo_proyecto') \
    .agg(
        count('*').alias('num_proyectos'),
        round(sum('total_base_limpio'), 2).alias('total_facturado')
    ) \
    .orderBy('num_proyectos', ascending=False) \
    .show()

Resumen por tipo de proyecto:
+-------------+-------------+---------------+
|tipo_proyecto|num_proyectos|total_facturado|
+-------------+-------------+---------------+
|     Edificio|         2091| 2.1321243976E8|
|      Reforma|         2008| 2.0007210337E8|
|        Local|         2003| 2.0196713818E8|
|      Oficina|         1957| 2.0255072081E8|
|  Residencial|         1941| 1.9580873541E8|
+-------------+-------------+---------------+



In [19]:
# Agregacion 3: Top 10 colaboradores por facturacion
print('Top 10 colaboradores por facturacion:')
df.groupBy('colaborador_limpio') \
    .agg(
        count('*').alias('num_proyectos'),
        round(sum('total_base_limpio'), 2).alias('total_facturado')
    ) \
    .orderBy('total_facturado', ascending=False) \
    .show(10)

Top 10 colaboradores por facturacion:
+------------------+-------------+---------------+
|colaborador_limpio|num_proyectos|total_facturado|
+------------------+-------------+---------------+
|       Ana Morales|          961|  9.850517433E7|
|      María García|          963|  9.801322175E7|
|       Elena Rivas|          955|  9.719015337E7|
|      Luis Delgado|          938|  9.496186737E7|
|   Lucía Fernández|          904|  9.484847902E7|
|        Juan Gómez|          934|   9.42459649E7|
|       Marta López|          901|  9.133148802E7|
|       Carlos Ruiz|          900|  9.130283081E7|
|      Pedro Ortega|          894|  9.076626635E7|
|        José Pérez|          923|   8.99818484E7|
+------------------+-------------+---------------+
only showing top 10 rows


## 6. MODELO DIMENSIONAL

Creacion de tablas de dimensiones y tabla de hechos

In [20]:
# Dimension Colaborador
dim_colaborador = df.select('colaborador_limpio').distinct() \
    .withColumn('id_colaborador', monotonically_increasing_id() + 1) \
    .select(
        col('id_colaborador').cast('int'),
        col('colaborador_limpio').alias('colaborador')
    )

print(f'dim_colaborador creada: {dim_colaborador.count()} registros')
dim_colaborador.show()

dim_colaborador creada: 20 registros
+--------------+---------------+
|id_colaborador|    colaborador|
+--------------+---------------+
|             1|   Pedro Ortega|
|             2|     José.pérez|
|             3|    Carlos.ruiz|
|             4|    Elena Rivas|
|             5|   María.garcía|
|             6|    Elena.rivas|
|             7|    Marta López|
|             8|Lucía.fernández|
|             9|    Marta.lópez|
|            10|    Carlos Ruiz|
|            11|    Ana Morales|
|            12|   María García|
|            13|   Pedro.ortega|
|            14|    Ana.morales|
|            15|     Juan.gómez|
|            16|     Juan Gómez|
|            17|   Luis.delgado|
|            18|   Luis Delgado|
|            19|Lucía Fernández|
|            20|     José Pérez|
+--------------+---------------+



In [21]:
# Dimension Ciudad
dim_ciudad = df.select('ciudad_limpia').distinct() \
    .filter(col('ciudad_limpia').isNotNull()) \
    .withColumn('id_ciudad', monotonically_increasing_id() + 1) \
    .select(
        col('id_ciudad').cast('int'),
        col('ciudad_limpia').alias('ciudad')
    )

print(f'dim_ciudad creada: {dim_ciudad.count()} registros')
dim_ciudad.show()

dim_ciudad creada: 4 registros
+---------+---------+
|id_ciudad|   ciudad|
+---------+---------+
|        1|   Madrid|
|        2|   Bilbao|
|        3|Barcelona|
|        4| Valencia|
+---------+---------+



In [22]:
# Dimension Tipo Proyecto
dim_tipo_proyecto = df.select('tipo_proyecto').distinct() \
    .withColumn('id_tipo_proyecto', monotonically_increasing_id() + 1) \
    .select(
        col('id_tipo_proyecto').cast('int'),
        col('tipo_proyecto')
    )

print(f'dim_tipo_proyecto creada: {dim_tipo_proyecto.count()} registros')
dim_tipo_proyecto.show()

dim_tipo_proyecto creada: 5 registros
+----------------+-------------+
|id_tipo_proyecto|tipo_proyecto|
+----------------+-------------+
|               1|      Reforma|
|               2|     Edificio|
|               3|        Local|
|               4|  Residencial|
|               5|      Oficina|
+----------------+-------------+



In [23]:
# Dimension Estado
dim_estado = df.select('estado').distinct() \
    .withColumn('id_estado', monotonically_increasing_id() + 1) \
    .select(
        col('id_estado').cast('int'),
        col('estado')
    )

print(f'dim_estado creada: {dim_estado.count()} registros')
dim_estado.show()

dim_estado creada: 4 registros
+---------+----------+
|id_estado|    estado|
+---------+----------+
|        1|Finalizado|
|        2| Cancelado|
|        3|  En curso|
|        4| Pendiente|
+---------+----------+



In [24]:
# Dimension Fase
dim_fase = df.select('fase').distinct() \
    .withColumn('id_fase', monotonically_increasing_id() + 1) \
    .select(
        col('id_fase').cast('int'),
        col('fase')
    )

print(f'dim_fase creada: {dim_fase.count()} registros')
dim_fase.show()

dim_fase creada: 4 registros
+-------+-------------+
|id_fase|         fase|
+-------+-------------+
|      1|       Diseño|
|      2|Planificación|
|      3|  Presupuesto|
|      4|         Obra|
+-------+-------------+



In [25]:
# Dimension Tamano
dim_tamano = df.select('tamano_proyecto').distinct() \
    .withColumn('id_tamano', monotonically_increasing_id() + 1) \
    .select(
        col('id_tamano').cast('int'),
        col('tamano_proyecto').alias('tamano')
    )

print(f'dim_tamano creada: {dim_tamano.count()} registros')
dim_tamano.show()

dim_tamano creada: 4 registros
+---------+----------+
|id_tamano|    tamano|
+---------+----------+
|        1|   Mediano|
|        2|   Pequeno|
|        3|    Grande|
|        4|Muy Grande|
+---------+----------+



In [26]:

fact_proyectos = df.alias('f') \
    .join(dim_colaborador.alias('dc'), col('f.colaborador_limpio') == col('dc.colaborador'), 'left') \
    .join(dim_ciudad.alias('dci'), col('f.ciudad_limpia') == col('dci.ciudad'), 'left') \
    .join(dim_tipo_proyecto.alias('dt'), col('f.tipo_proyecto') == col('dt.tipo_proyecto'), 'left') \
    .join(dim_estado.alias('de'), col('f.estado') == col('de.estado'), 'left') \
    .join(dim_fase.alias('dfa'), col('f.fase') == col('dfa.fase'), 'left') \
    .join(dim_tamano.alias('dta'), col('f.tamano_proyecto') == col('dta.tamano'), 'left') \
    .select(
        col('f.id_proyecto_final').alias('id_proyecto'),
        col('f.nombre_proyecto'),
        col('dc.id_colaborador'),
        col('dci.id_ciudad'),
        col('dt.id_tipo_proyecto'),
        col('de.id_estado'),
        col('dfa.id_fase'),
        col('dta.id_tamano'),
        round(col('f.total_base_limpio'), 2).alias('total_base'),
        col('f.iva_decimal').alias('iva_porcentaje'),
        round(col('f.total_con_iva_final'), 2).alias('total_con_iva'),
        round(col('f.importe_iva'), 2).alias('importe_iva')
    )

print(f'fact_proyectos creada: {fact_proyectos.count()} registros')
fact_proyectos.show(10)

fact_proyectos creada: 10000 registros
+-----------+--------------------+--------------+---------+----------------+---------+-------+---------+----------+--------------+-------------+-----------+
|id_proyecto|     nombre_proyecto|id_colaborador|id_ciudad|id_tipo_proyecto|id_estado|id_fase|id_tamano|total_base|iva_porcentaje|total_con_iva|importe_iva|
+-----------+--------------------+--------------+---------+----------------+---------+-------+---------+----------+--------------+-------------+-----------+
|          1|Proyecto Edificio...|             1|        3|               2|        1|      3|        1|  78035.32|          0.21|     93642.38|   15607.06|
|          2|Proyecto Residenc...|             7|        4|               4|        4|      3|        4| 190389.29|          0.21|    230371.04|   39981.75|
|          3|Proyecto Reforma ...|             1|        2|               1|        3|      3|        3| 147738.82|          0.21|    178763.97|   31025.15|
|          4|Proyec

## 7. CARGA - Exportacion a SQLite

In [27]:
# Asegurar que existe el directorio warehouse
warehouse_dir = os.path.dirname(WAREHOUSE_PATH)
if not os.path.exists(warehouse_dir):
    os.makedirs(warehouse_dir)
    print(f'Directorio creado: {warehouse_dir}')

# Eliminar base de datos anterior si existe
if os.path.exists(WAREHOUSE_PATH):
    os.remove(WAREHOUSE_PATH)
    print(f'Base de datos anterior eliminada')

print(f'Preparando carga en: {WAREHOUSE_PATH}')

Base de datos anterior eliminada
Preparando carga en: /app/warehouse/warehouse_pyspark.db


In [28]:
# Convertir DataFrames de Spark a Pandas para cargar en SQLite
print('Convirtiendo DataFrames a Pandas...')

dim_colaborador_pd = dim_colaborador.toPandas()
dim_ciudad_pd = dim_ciudad.toPandas()
dim_tipo_proyecto_pd = dim_tipo_proyecto.toPandas()
dim_estado_pd = dim_estado.toPandas()
dim_fase_pd = dim_fase.toPandas()
dim_tamano_pd = dim_tamano.toPandas()
fact_proyectos_pd = fact_proyectos.toPandas()

print('DataFrames convertidos a Pandas')

Convirtiendo DataFrames a Pandas...
DataFrames convertidos a Pandas


In [29]:
# Conexion a SQLite y carga de datos
conn = sqlite3.connect(WAREHOUSE_PATH)
print(f'Conexion establecida: {WAREHOUSE_PATH}')

# Cargar dimensiones
dim_colaborador_pd.to_sql('dim_colaborador', conn, if_exists='replace', index=False)
print(f'dim_colaborador cargada: {len(dim_colaborador_pd)} registros')

dim_ciudad_pd.to_sql('dim_ciudad', conn, if_exists='replace', index=False)
print(f'dim_ciudad cargada: {len(dim_ciudad_pd)} registros')

dim_tipo_proyecto_pd.to_sql('dim_tipo_proyecto', conn, if_exists='replace', index=False)
print(f'dim_tipo_proyecto cargada: {len(dim_tipo_proyecto_pd)} registros')

dim_estado_pd.to_sql('dim_estado', conn, if_exists='replace', index=False)
print(f'dim_estado cargada: {len(dim_estado_pd)} registros')

dim_fase_pd.to_sql('dim_fase', conn, if_exists='replace', index=False)
print(f'dim_fase cargada: {len(dim_fase_pd)} registros')

dim_tamano_pd.to_sql('dim_tamano', conn, if_exists='replace', index=False)
print(f'dim_tamano cargada: {len(dim_tamano_pd)} registros')

# Cargar tabla de hechos
fact_proyectos_pd.to_sql('fact_proyectos', conn, if_exists='replace', index=False)
print(f'fact_proyectos cargada: {len(fact_proyectos_pd)} registros')

Conexion establecida: /app/warehouse/warehouse_pyspark.db
dim_colaborador cargada: 20 registros
dim_ciudad cargada: 4 registros
dim_tipo_proyecto cargada: 5 registros
dim_estado cargada: 4 registros
dim_fase cargada: 4 registros
dim_tamano cargada: 4 registros
fact_proyectos cargada: 10000 registros


In [30]:
# Verificar carga
print('\nVERIFICACION DE CARGA EN SQLite')
print('='*50)

tablas = pd.read_sql("SELECT name FROM sqlite_master WHERE type='table'", conn)
print(f'Tablas creadas: {list(tablas["name"])}')

for tabla in tablas['name']:
    count = pd.read_sql(f'SELECT COUNT(*) as n FROM {tabla}', conn)['n'][0]
    print(f'   {tabla}: {count} registros')


VERIFICACION DE CARGA EN SQLite
Tablas creadas: ['dim_colaborador', 'dim_ciudad', 'dim_tipo_proyecto', 'dim_estado', 'dim_fase', 'dim_tamano', 'fact_proyectos']
   dim_colaborador: 20 registros
   dim_ciudad: 4 registros
   dim_tipo_proyecto: 5 registros
   dim_estado: 4 registros
   dim_fase: 4 registros
   dim_tamano: 4 registros
   fact_proyectos: 10000 registros


---
## 7.1 GENERACION DEL ARCHIVO DDL

Generamos el archivo SQL con la definicion del modelo dimensional.

In [31]:
# ============================================================
# GENERACION DEL ARCHIVO DDL (modelo_datawarehouse_pyspark.sql)
# ============================================================

# Definir la ruta del archivo SQL
SQL_PATH = WAREHOUSE_PATH.replace('warehouse_pyspark.db', 'modelo_datawarehouse_pyspark.sql')

# DDL completo del modelo dimensional
ddl_content = '''
-- ============================================================
-- MODELO DATAWAREHOUSE PYSPARK
-- Proyecto RA1: Analisis de Proyectos de Arquitectura
-- Base de datos: warehouse_pyspark.db
-- Generado automaticamente por 02_pyspark.ipynb
-- ============================================================

-- ============================================================
-- TABLAS DE DIMENSIONES
-- ============================================================

-- Dimension: Colaboradores
CREATE TABLE IF NOT EXISTS dim_colaborador (
    id_colaborador INTEGER PRIMARY KEY,
    colaborador TEXT NOT NULL
);

-- Dimension: Ciudades
CREATE TABLE IF NOT EXISTS dim_ciudad (
    id_ciudad INTEGER PRIMARY KEY,
    ciudad TEXT NOT NULL
);

-- Dimension: Tipo de Proyecto
CREATE TABLE IF NOT EXISTS dim_tipo_proyecto (
    id_tipo_proyecto INTEGER PRIMARY KEY,
    tipo_proyecto TEXT NOT NULL
);

-- Dimension: Estado
CREATE TABLE IF NOT EXISTS dim_estado (
    id_estado INTEGER PRIMARY KEY,
    estado TEXT NOT NULL
);

-- Dimension: Fase
CREATE TABLE IF NOT EXISTS dim_fase (
    id_fase INTEGER PRIMARY KEY,
    fase TEXT NOT NULL
);

-- Dimension: Tamano
CREATE TABLE IF NOT EXISTS dim_tamano (
    id_tamano INTEGER PRIMARY KEY,
    tamano TEXT NOT NULL
);

-- ============================================================
-- TABLA DE HECHOS
-- ============================================================

CREATE TABLE IF NOT EXISTS fact_proyectos (
    id_proyecto INTEGER PRIMARY KEY,
    nombre_proyecto TEXT,
    id_colaborador INTEGER,
    id_ciudad INTEGER,
    id_tipo_proyecto INTEGER,
    id_estado INTEGER,
    id_fase INTEGER,
    id_tamano INTEGER,
    total_base REAL,
    iva_porcentaje REAL,
    total_con_iva REAL,
    importe_iva REAL,
    FOREIGN KEY (id_colaborador) REFERENCES dim_colaborador(id_colaborador),
    FOREIGN KEY (id_ciudad) REFERENCES dim_ciudad(id_ciudad),
    FOREIGN KEY (id_tipo_proyecto) REFERENCES dim_tipo_proyecto(id_tipo_proyecto),
    FOREIGN KEY (id_estado) REFERENCES dim_estado(id_estado),
    FOREIGN KEY (id_fase) REFERENCES dim_fase(id_fase),
    FOREIGN KEY (id_tamano) REFERENCES dim_tamano(id_tamano)
);

-- ============================================================
-- INDICES PARA OPTIMIZACION
-- ============================================================

CREATE INDEX IF NOT EXISTS idx_fact_colaborador ON fact_proyectos(id_colaborador);
CREATE INDEX IF NOT EXISTS idx_fact_ciudad ON fact_proyectos(id_ciudad);
CREATE INDEX IF NOT EXISTS idx_fact_tipo ON fact_proyectos(id_tipo_proyecto);
CREATE INDEX IF NOT EXISTS idx_fact_estado ON fact_proyectos(id_estado);
CREATE INDEX IF NOT EXISTS idx_fact_fase ON fact_proyectos(id_fase);
CREATE INDEX IF NOT EXISTS idx_fact_tamano ON fact_proyectos(id_tamano);
'''

# Escribir el archivo SQL
with open(SQL_PATH, 'w') as f:
    f.write(ddl_content.strip())

print(f'Archivo DDL generado: {SQL_PATH}')
print(f'Tamano: {os.path.getsize(SQL_PATH)} bytes')

Archivo DDL generado: /app/warehouse/modelo_datawarehouse_pyspark.sql
Tamano: 2761 bytes


## 8. CONSULTAS DE EJEMPLO

In [32]:
# Consulta 1: Total facturado por ciudad
query1 = '''
SELECT 
    c.ciudad,
    COUNT(*) as num_proyectos,
    ROUND(SUM(f.total_base), 2) as total_base,
    ROUND(AVG(f.total_base), 2) as promedio
FROM fact_proyectos f
JOIN dim_ciudad c ON f.id_ciudad = c.id_ciudad
GROUP BY c.ciudad
ORDER BY total_base DESC
'''
print('Total facturado por ciudad:')
pd.read_sql(query1, conn)

Total facturado por ciudad:


Unnamed: 0,ciudad,num_proyectos,total_base,promedio
0,Barcelona,4063,408179300.0,100462.53
1,Madrid,3013,307235100.0,101969.82
2,Valencia,1925,198195100.0,102958.47
3,Bilbao,999,100001700.0,100101.84


In [33]:
# Consulta 2: Proyectos por tipo y estado
query2 = '''
SELECT 
    tp.tipo_proyecto,
    e.estado,
    COUNT(*) as num_proyectos
FROM fact_proyectos f
JOIN dim_tipo_proyecto tp ON f.id_tipo_proyecto = tp.id_tipo_proyecto
JOIN dim_estado e ON f.id_estado = e.id_estado
GROUP BY tp.tipo_proyecto, e.estado
ORDER BY tp.tipo_proyecto
'''
print('Proyectos por tipo y estado:')
pd.read_sql(query2, conn)

Proyectos por tipo y estado:


Unnamed: 0,tipo_proyecto,estado,num_proyectos
0,Edificio,Cancelado,521
1,Edificio,En curso,533
2,Edificio,Finalizado,486
3,Edificio,Pendiente,551
4,Local,Cancelado,526
5,Local,En curso,506
6,Local,Finalizado,494
7,Local,Pendiente,477
8,Oficina,Cancelado,486
9,Oficina,En curso,497


In [34]:
# Consulta 3: Top colaboradores
query3 = '''
SELECT 
    col.colaborador,
    COUNT(*) as num_proyectos,
    ROUND(SUM(f.total_base), 2) as total_facturado
FROM fact_proyectos f
JOIN dim_colaborador col ON f.id_colaborador = col.id_colaborador
GROUP BY col.colaborador
ORDER BY total_facturado DESC
LIMIT 10
'''
print('Top 10 colaboradores:')
pd.read_sql(query3, conn)

Top 10 colaboradores:


Unnamed: 0,colaborador,num_proyectos,total_facturado
0,Ana Morales,961,98505174.33
1,María García,963,98013221.75
2,Elena Rivas,955,97190153.37
3,Luis Delgado,938,94961867.37
4,Lucía Fernández,904,94848479.02
5,Juan Gómez,934,94245964.9
6,Marta López,901,91331488.02
7,Carlos Ruiz,900,91302830.81
8,Pedro Ortega,894,90766266.35
9,José Pérez,923,89981848.4


In [35]:
# Cerrar conexion SQLite
conn.close()
print(f'\nConexion SQLite cerrada.')
print(f'Base de datos guardada en: {WAREHOUSE_PATH}')


Conexion SQLite cerrada.
Base de datos guardada en: /app/warehouse/warehouse_pyspark.db


## 9. VERIFICACION FINAL

In [36]:
# Verificar que el archivo se creo correctamente
print('VERIFICACION DE ARCHIVOS CREADOS')
print('='*50)

if os.path.exists(WAREHOUSE_PATH):
    size = os.path.getsize(WAREHOUSE_PATH) / 1024  # KB
    print(f'warehouse_pyspark.db: {size:.2f} KB')
else:
    print('ERROR: warehouse_pyspark.db no se creo!')

print('\nProceso ETL con PySpark COMPLETADO!')

VERIFICACION DE ARCHIVOS CREADOS
warehouse_pyspark.db: 804.00 KB

Proceso ETL con PySpark COMPLETADO!


In [37]:
# Detener SparkSession
spark.stop()
print('SparkSession detenida.')

SparkSession detenida.


---
## RESUMEN DEL PROCESO ETL CON PYSPARK

### Extraccion (E)
- Carga del dataset CSV con inferencia de esquema
- 10,000 registros de proyectos de arquitectura

### Transformacion (T) - 8 Transformaciones
1. Normalizacion de colaboradores (initcap, trim, regexp_replace)
2. Normalizacion de ciudades (when/otherwise mapping)
3. Limpieza de valores monetarios
4. Normalizacion de IVA a decimal
5. Extraccion de tipo de proyecto
6. Categorizacion por tamano
7. Calculo de metricas derivadas
8. Asignacion de IDs faltantes

### Carga (L)
- Modelo dimensional en estrella
- 6 tablas de dimensiones + 1 tabla de hechos
- Base de datos SQLite: warehouse_pyspark.db