## Importar librerias

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, to_date, avg, count, sum, when, hour, date_sub, current_date
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType

Creamos la SparkSession:

In [2]:
spark = SparkSession.builder \
    .appName("ETL Churn Entel") \
    .getOrCreate()

print("SparkSession creada y conectada al clúster.")

25/11/20 21:20:15 WARN Utils: Your hostname, Debian12 resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
25/11/20 21:20:15 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/20 21:20:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


SparkSession creada y conectada al clúster.


### Fase: Extract

Especificamos manualmente los esquemas para que spark pueda leerl

In [3]:
from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType, 
    DoubleType, TimestampType, DateType, BooleanType
)

# Definimos esquemas
schema_llamadas = StructType([
    StructField("id_registro", StringType(), True),
    StructField("id_cliente", StringType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("duracion_segundos", DoubleType(), True),
    StructField("datos_mb_consumidos", DoubleType(), True),
    StructField("calidad_red", IntegerType(), True)
])

schema_facturacion = StructType([
    StructField("id_factura", StringType(), True),
    StructField("id_cliente", StringType(), True),
    StructField("fecha_emision", DateType(), True),
    StructField("monto", DoubleType(), True),
    StructField("estado_pago", StringType(), True),
    StructField("reclamo_presentado", BooleanType(), True)
])

schema_social_media = StructType([
    StructField("id_comentario", StringType(), True),
    StructField("id_cliente", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("red_social", StringType(), True),
    StructField("texto_comentario", StringType(), True)
])

schema_clients = StructType([
    StructField("id_cliente", StringType(), True),
    StructField("nombre_completo", StringType(), True),
    StructField("fecha_alta", DateType(), True),
    StructField("id_plan", StringType(), True),
    StructField("region", StringType(), True),
    StructField("churn_futuro", BooleanType(), True)
])

In [4]:
# Definir rutas HDFS
base_path = "hdfs://localhost:9000/chex/BigData_UPAO/bigdata_env/churn_project/raw"

path_llamadas = f"{base_path}/json/llamadas"
path_facturacion = f"{base_path}/json/facturacion"
path_social_media = f"{base_path}/parquet/social_media"
path_clients = f"{base_path}/csv/clients"

# Cargar los DataFrames
print("Cargando datos...")

df_llamadas = spark.read.option("multiline", "true").schema(schema_llamadas).json(path_llamadas)
df_facturacion = spark.read.option("multiline", "true").schema(schema_facturacion).json(path_facturacion)
df_social_media = spark.read.schema(schema_social_media).parquet(path_social_media)
df_clients = spark.read.csv(
    path_clients,
    header=True,
    schema=schema_clients,
    dateFormat="yyyy-MM-dd"
)

print("Carga completa.")

# Mostrar 5 registros
print("\n--- DATOS: LLAMADAS ---")
df_llamadas.show(5)

print("\n--- DATOS: FACTURACION ---")
df_facturacion.show(5)

print("\n--- DATOS: SOCIAL MEDIA ---")
df_social_media.show(5)

print("\n--- DATOS: CLIENTS ---")
df_clients.show(5)

Cargando datos...
Carga completa.

--- DATOS: LLAMADAS ---


                                                                                

+--------------+--------------+-------------------+-----------------+-------------------+-----------+
|   id_registro|    id_cliente|          timestamp|duracion_segundos|datos_mb_consumidos|calidad_red|
+--------------+--------------+-------------------+-----------------+-------------------+-----------+
|CDR-0000000001|ENTEL-00000000|2025-09-29 21:22:22|             18.0|               82.0|          5|
|CDR-0000000002|ENTEL-00000000|2025-09-29 19:41:28|             26.0|               28.0|          3|
|CDR-0000000003|ENTEL-00000000|2025-09-29 06:36:54|            396.0|               13.0|          4|
|CDR-0000000004|ENTEL-00000000|2025-09-29 15:10:49|             16.0|               44.0|          4|
|CDR-0000000005|ENTEL-00000000|2025-09-29 07:27:50|            328.0|                0.0|          4|
+--------------+--------------+-------------------+-----------------+-------------------+-----------+
only showing top 5 rows


--- DATOS: FACTURACION ---
+-------------+--------------

                                                                                

+-------------+--------------+-------------------+----------+--------------------+
|id_comentario|    id_cliente|          timestamp|red_social|    texto_comentario|
+-------------+--------------+-------------------+----------+--------------------+
|  SOC-0000001|ENTEL-00000004|2025-08-17 15:38:06|   Twitter|Llevo 3 días sin ...|
|  SOC-0000002|ENTEL-00000004|2025-09-03 17:38:06|   Twitter|Mi factura del pl...|
|  SOC-0000003|ENTEL-00000004|2025-08-30 19:38:06|   Twitter|Mi factura del pl...|
|  SOC-0000004|ENTEL-00000004|2025-02-07 04:38:06|   Twitter|La velocidad de m...|
|  SOC-0000005|ENTEL-00000008|2025-09-09 03:38:06|  Facebook|¿Alguien sabe cóm...|
+-------------+--------------+-------------------+----------+--------------------+
only showing top 5 rows


--- DATOS: CLIENTS ---
+--------------+--------------------+----------+---------------+-----------+------------+
|    id_cliente|     nombre_completo|fecha_alta|        id_plan|     region|churn_futuro|
+--------------+--------

### Fase: Transform

Importaciones y Corrección de Tipos

Primero, importamos las funciones que usaremos y corregimos el timestamp de social_media que estaba como String.

In [6]:
from pyspark.sql.functions import (
    col, to_timestamp, datediff, current_date, avg, sum, count,
    when, isnan, isnull, lit
)
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, Imputer

# Corregir el tipo de dato
df_social_media = df_social_media.withColumn(
    "timestamp_dt", 
    to_timestamp(col("timestamp"), "yyyy-MM-dd HH:mm:ss")
)

# Convertir label a numérico
df_clients = df_clients.withColumn(
    "label", 
    col("churn_futuro").cast("double")
)

print("Tipos de datos corregidos.")

Tipos de datos corregidos.


Feature Engineering

Agregamos las tablas transaccionales a nivel id_cliente

In [7]:
# Agregación de LLAMADAS
agg_llamadas = df_llamadas.groupBy("id_cliente").agg(
    count("id_registro").alias("total_llamadas"),
    sum("duracion_segundos").alias("total_segundos_llamada"),
    sum("datos_mb_consumidos").alias("total_mb_consumidos"),
    avg("calidad_red").alias("promedio_calidad_red")
)

# Agregación de FACTURACION
agg_facturacion = df_facturacion.groupBy("id_cliente").agg(
    count("id_factura").alias("total_facturas"),
    sum("monto").alias("monto_total_facturado"),
    avg("monto").alias("monto_promedio_factura"),
    # Contar reclamos (True/False) y pagos pendientes
    sum(when(col("reclamo_presentado") == True, 1).otherwise(0)).alias("total_reclamos"),
    sum(when(col("estado_pago") != "PAGADO", 1).otherwise(0)).alias("total_facturas_pendientes")
)

# Agregación de SOCIAL MEDIA
agg_social_media = df_social_media.groupBy("id_cliente").agg(
    count("id_comentario").alias("total_comentarios_social")
)

print("Feature Engineering completado.")
agg_llamadas.show(3)
agg_facturacion.show(3)

Feature Engineering completado.


                                                                                

+--------------+--------------+----------------------+-------------------+--------------------+
|    id_cliente|total_llamadas|total_segundos_llamada|total_mb_consumidos|promedio_calidad_red|
+--------------+--------------+----------------------+-------------------+--------------------+
|ENTEL-00000053|          1625|              239957.0|            94217.0|   3.673230769230769|
|ENTEL-00000057|          1619|              236804.0|            97006.0|  3.7492279184681903|
|ENTEL-00000099|          1545|              209613.0|            91961.0|  3.7171521035598705|
+--------------+--------------+----------------------+-------------------+--------------------+
only showing top 3 rows

+--------------+--------------+---------------------+----------------------+--------------+-------------------------+
|    id_cliente|total_facturas|monto_total_facturado|monto_promedio_factura|total_reclamos|total_facturas_pendientes|
+--------------+--------------+---------------------+--------------

Unificación de datos con Join

Empezamos con df_clients y hacemos left join con las tablas agregadas.

In [8]:
# Empezamos con la tabla de clientes
df_master = df_clients

# Hacemos left join porque un cliente puede no tener llamadas o reclamos
df_master = df_master.join(agg_llamadas, on="id_cliente", how="left")
df_master = df_master.join(agg_facturacion, on="id_cliente", how="left")
df_master = df_master.join(agg_social_media, on="id_cliente", how="left")

print("Joins completados.")
df_master.printSchema()

Joins completados.
root
 |-- id_cliente: string (nullable = true)
 |-- nombre_completo: string (nullable = true)
 |-- fecha_alta: date (nullable = true)
 |-- id_plan: string (nullable = true)
 |-- region: string (nullable = true)
 |-- churn_futuro: boolean (nullable = true)
 |-- label: double (nullable = true)
 |-- total_llamadas: long (nullable = true)
 |-- total_segundos_llamada: double (nullable = true)
 |-- total_mb_consumidos: double (nullable = true)
 |-- promedio_calidad_red: double (nullable = true)
 |-- total_facturas: long (nullable = true)
 |-- monto_total_facturado: double (nullable = true)
 |-- monto_promedio_factura: double (nullable = true)
 |-- total_reclamos: long (nullable = true)
 |-- total_facturas_pendientes: long (nullable = true)
 |-- total_comentarios_social: long (nullable = true)



Limpieza Post-Join y últimos features

Después del left join, los clientes sin llamadas tendrán null en total_llamadas. Esto no es un "nulo" (desconocido), sino un 0 (cero llamadas).

In [9]:
# Eliminar columnas irrelevantes para el modelo
columnas_a_eliminar = [
    "nombre_completo", "churn_futuro", 
    "id_factura", "id_registro", "id_comentario"
]
# Filtramos las columnas
columnas_existentes_para_eliminar = [c for c in columnas_a_eliminar if c in df_master.columns]
if columnas_existentes_para_eliminar:
    df_master = df_master.drop(*columnas_existentes_para_eliminar)

# Crear 'antiguedad_dias'
df_master = df_master.withColumn(
    "antiguedad_dias", 
    datediff(current_date(), col("fecha_alta"))
)
df_master = df_master.drop("fecha_alta")


# Manejar Nulos (Imputación de columnas que significan 0)
columnas_agregadas_numericas = [
    "total_llamadas", "total_segundos_llamada", "total_mb_consumidos", 
    "promedio_calidad_red", "total_facturas", "monto_total_facturado", 
    "monto_promedio_factura", "total_reclamos", "total_facturas_pendientes", 
    "total_comentarios_social"
]
# Filtramos
columnas_a_imputar = [c for c in columnas_agregadas_numericas if c in df_master.columns]
df_master = df_master.fillna(0, subset=columnas_a_imputar)

print("Limpieza Post-Join completada.")
df_master.show(5)

Limpieza Post-Join completada.


                                                                                

+--------------+---------------+-----------+-----+--------------+----------------------+-------------------+--------------------+--------------+---------------------+----------------------+--------------+-------------------------+------------------------+---------------+
|    id_cliente|        id_plan|     region|label|total_llamadas|total_segundos_llamada|total_mb_consumidos|promedio_calidad_red|total_facturas|monto_total_facturado|monto_promedio_factura|total_reclamos|total_facturas_pendientes|total_comentarios_social|antiguedad_dias|
+--------------+---------------+-----------+-----+--------------+----------------------+-------------------+--------------------+--------------+---------------------+----------------------+--------------+-------------------------+------------------------+---------------+
|ENTEL-00000004|Prepago Chip 10|        Ica|  1.0|          1602|              225410.4|            92921.9|  2.2808988764044944|            12|                 60.0|                  

Pipeline de Vectorización

Paso final de pre-procesamiento.

In [10]:
# Identificar columnas categóricas vs numéricas
categorical_cols = ["id_plan", "region"]
numeric_cols = [
    "antiguedad_dias", "total_llamadas", "total_segundos_llamada", 
    "total_mb_consumidos", "promedio_calidad_red", "total_facturas", 
    "monto_total_facturado", "monto_promedio_factura", "total_reclamos", 
    "total_facturas_pendientes", "total_comentarios_social"
]

# Crear las etapas del Pipeline
stages = []

# StringIndexer y OneHotEncoder para cada columna categórica
for c in categorical_cols:
    indexer = StringIndexer(inputCol=c, outputCol=f"{c}_index", handleInvalid="keep")
    encoder = OneHotEncoder(inputCols=[indexer.getOutputCol()], outputCols=[f"{c}_vec"])
    stages += [indexer, encoder]

# Juntamos las columnas numéricas originales + los nuevos vectores OHE
feature_cols = numeric_cols + [f"{c}_vec" for c in categorical_cols]

assembler = VectorAssembler(
    inputCols=feature_cols, 
    outputCol="features"
)
stages += [assembler]

pipeline = Pipeline(stages=stages)
pipeline_model = pipeline.fit(df_master)
df_model_ready = pipeline_model.transform(df_master)

print("Pipeline de vectorización completado.")

                                                                                

Pipeline de vectorización completado.


Dataframe final

In [11]:
print("DataFrame listo:")
df_model_ready.select("id_cliente", "label", "features").show(5, truncate=False)

print("\nEsquema final:")
df_model_ready.printSchema()

DataFrame listo:


25/11/20 19:49:57 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+--------------+-----+-----------------------------------------------------------------------------------------------------------------------------------+
|id_cliente    |label|features                                                                                                                           |
+--------------+-----+-----------------------------------------------------------------------------------------------------------------------------------+
|ENTEL-00000053|0.0  |(23,[0,1,2,3,4,5,6,7,14,18],[1943.0,1625.0,239957.0,94217.0,3.673230769230769,12.0,1558.8000000000004,129.90000000000003,1.0,1.0]) |
|ENTEL-00000057|0.0  |(23,[0,1,2,3,4,5,6,7,12,18],[1137.0,1619.0,236804.0,97006.0,3.7492279184681903,12.0,718.7999999999998,59.899999999999984,1.0,1.0]) |
|ENTEL-00000099|0.0  |(23,[0,1,2,3,4,5,6,7,14,17],[1006.0,1545.0,209613.0,91961.0,3.7171521035598705,12.0,1558.8000000000004,129.90000000000003,1.0,1.0])|
|ENTEL-00000070|0.0  |(23,[0,1,2,3,4,5,6,7,9,11,20],[1414.0,1628.0,236

### Fase: Load

In [12]:
# Definir la ruta de destino en HDFS
save_path_full = "hdfs://localhost:9000/chex/BigData_UPAO/bigdata_env/churn_project/processed"

# Guardamos el DataFrame 'df_model_ready'
print(f"DataFrame guardado:")
df_model_ready.printSchema()

# Guardar en formato Parquet
print(f"\nGuardando datos completos en: {save_path_full}")

df_model_ready.write \
    .mode("overwrite") \
    .parquet(save_path_full)

print("Fase Load completada")
print(f"Tus datos listos en {save_path_full}")

DataFrame guardado:
root
 |-- id_cliente: string (nullable = true)
 |-- id_plan: string (nullable = true)
 |-- region: string (nullable = true)
 |-- label: double (nullable = true)
 |-- total_llamadas: long (nullable = true)
 |-- total_segundos_llamada: double (nullable = false)
 |-- total_mb_consumidos: double (nullable = false)
 |-- promedio_calidad_red: double (nullable = false)
 |-- total_facturas: long (nullable = true)
 |-- monto_total_facturado: double (nullable = false)
 |-- monto_promedio_factura: double (nullable = false)
 |-- total_reclamos: long (nullable = true)
 |-- total_facturas_pendientes: long (nullable = true)
 |-- total_comentarios_social: long (nullable = true)
 |-- antiguedad_dias: integer (nullable = true)
 |-- id_plan_index: double (nullable = false)
 |-- id_plan_vec: vector (nullable = true)
 |-- region_index: double (nullable = false)
 |-- region_vec: vector (nullable = true)
 |-- features: vector (nullable = true)


Guardando datos completos en: hdfs://localh

[Stage 51:>                                                         (0 + 1) / 1]

Fase Load completada
Tus datos listos en hdfs://localhost:9000/chex/BigData_UPAO/bigdata_env/churn_project/processed


                                                                                

In [None]:
Ahora podemos ver nuestro archivo final:

In [19]:
local_file_path = "part-00000-32009aa4-be46-4705-b928-94c4709346eb-c000.snappy.parquet"

# Leer el Parquet.
df_comentario123 = spark.read.parquet(local_file_path)

# Mostrar los datos
print("Lectura de Parquet local exitosa:")
df_comentario123.show(5)
print("\nEsquema del DataFrame:")
df_comentario123.printSchema()

Lectura de Parquet local exitosa:
+--------------+---------------+-----------+-----+--------------+----------------------+-------------------+--------------------+--------------+---------------------+----------------------+--------------+-------------------------+------------------------+---------------+-------------+-------------+------------+-------------+--------------------+
|    id_cliente|        id_plan|     region|label|total_llamadas|total_segundos_llamada|total_mb_consumidos|promedio_calidad_red|total_facturas|monto_total_facturado|monto_promedio_factura|total_reclamos|total_facturas_pendientes|total_comentarios_social|antiguedad_dias|id_plan_index|  id_plan_vec|region_index|   region_vec|            features|
+--------------+---------------+-----------+-----+--------------+----------------------+-------------------+--------------------+--------------+---------------------+----------------------+--------------+-------------------------+------------------------+---------------

In [18]:
from pyspark.ml.classification import GBTClassificationModel
from pyspark.ml.functions import vector_to_array
from pyspark.sql.functions import col

ruta_datos = "hdfs://localhost:9000/chex/BigData_UPAO/bigdata_env/churn_project/processed"
df_input = spark.read.parquet(ruta_datos)

print("Datos cargados. Columnas disponibles:")
print(df_input.columns) 

ruta_modelo = "/home/chex/BigData_UPAO/bigdata_env/modelo_churn/"
model = GBTClassificationModel.load(ruta_modelo)

df_predicciones = model.transform(df_input)

df_resultado_final = df_predicciones.select(
    "id_cliente",
    "region",
    "id_plan",
    "antiguedad_dias",
    "monto_total_facturado",
    "total_facturas_pendientes",
    "promedio_calidad_red",
    "total_comentarios_social",
    "probability",
    "prediction"
)

df_resultado_final = df_resultado_final.withColumn(
    "prob_churn", 
    vector_to_array(col("probability"))[1]
).drop("probability")

print("¡Predicción completada! Muestra:")
df_resultado_final.show(5)

ruta_salida = "/home/chex/BigData_UPAO/output_modelo"
df_resultado_final.write.mode("overwrite").parquet(ruta_salida)

print(f"Datos con predicción guardados en: {ruta_salida}")

Datos cargados. Columnas disponibles:
['id_cliente', 'id_plan', 'region', 'label', 'total_llamadas', 'total_segundos_llamada', 'total_mb_consumidos', 'promedio_calidad_red', 'total_facturas', 'monto_total_facturado', 'monto_promedio_factura', 'total_reclamos', 'total_facturas_pendientes', 'total_comentarios_social', 'antiguedad_dias', 'id_plan_index', 'id_plan_vec', 'region_index', 'region_vec', 'features']
¡Predicción completada! Muestra:
+--------------+-----------+---------------+---------------+---------------------+-------------------------+--------------------+------------------------+----------+-------------------+
|    id_cliente|     region|        id_plan|antiguedad_dias|monto_total_facturado|total_facturas_pendientes|promedio_calidad_red|total_comentarios_social|prediction|         prob_churn|
+--------------+-----------+---------------+---------------+---------------------+-------------------------+--------------------+------------------------+----------+-------------------

### Detenemos la sesion:

In [20]:
spark.stop()