# Laboratorio 11

- Joaquín Puente 22296
- José Mérida

## 1. Inicialización de PySpark

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, count, when, isnan, isnull, mean, stddev, 
    min as spark_min, max as spark_max, round as spark_round
)
from pyspark.sql.types import DoubleType, IntegerType
import os

spark = SparkSession.builder \
    .appName("Customer Churn Analysis - Lab 11") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

print("Spark Session iniciada exitosamente!")
print(f"Versión de Spark: {spark.version}")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/10/24 20:39:46 WARN Utils: Your hostname, ArchJapo, resolves to a loopback address: 127.0.1.1; using 10.0.0.36 instead (on interface wlp0s20f3)
25/10/24 20:39:46 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/24 20:39:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark Session iniciada exitosamente!
Versión de Spark: 4.0.1


## 2. Carga de Datos

In [2]:
abandono_path = "abandono_clientes.csv"
nuevos_path = "clientes_nuevos.csv"

df_abandono = spark.read.csv(abandono_path, header=True, inferSchema=True)
df_nuevos = spark.read.csv(nuevos_path, header=True, inferSchema=True)

print(f"Dataset 'abandono_clientes' cargado: {df_abandono.count()} filas")
print(f"Dataset 'clientes_nuevos' cargado: {df_nuevos.count()} filas")

Dataset 'abandono_clientes' cargado: 900 filas
Dataset 'clientes_nuevos' cargado: 6 filas


## 3. Análisis Exploratorio de Datos (EDA) - Abandono Clientes

### 3.1 Estructura del Dataset

In [3]:
df_abandono.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)



### 3.2 Primeras Filas del Dataset

In [4]:
df_abandono.show(10, truncate=True)

+----------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+-----+
|           Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|       Onboard_date|            Location|             Company|Churn|
+----------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+-----+
|Cameron Williams|42.0|       11066.8|              0| 7.22|      8.0|2013-08-30 07:00:40|10265 Elizabeth M...|          Harvey LLC|    1|
|   Kevin Mueller|41.0|      11916.22|              0|  6.5|     11.0|2013-08-13 00:38:46|6157 Frank Garden...|          Wilson PLC|    1|
|     Eric Lozano|38.0|      12884.75|              0| 6.67|     12.0|2016-06-29 06:20:07|1331 Keith Court ...|Miller, Johnson a...|    1|
|   Phillip White|42.0|       8010.76|              0| 6.71|     10.0|2014-04-22 12:43:12|13120 Daniel Moun...|           Smith Inc|    1|
|  Cynthia Norton|37.0|    

### 3.3 Estadísticas Descriptivas

In [5]:
df_abandono.describe().show()

+-------+-------------+-----------------+-----------------+------------------+-----------------+------------------+--------------------+--------------------+-------------------+
|summary|        Names|              Age|   Total_Purchase|   Account_Manager|            Years|         Num_Sites|            Location|             Company|              Churn|
+-------+-------------+-----------------+-----------------+------------------+-----------------+------------------+--------------------+--------------------+-------------------+
|  count|          900|              900|              900|               900|              900|               900|                 900|                 900|                900|
|   mean|         NULL|41.81666666666667|10062.82403333334|0.4811111111111111| 5.27315555555555| 8.587777777777777|                NULL|                NULL|0.16666666666666666|
| stddev|         NULL|6.127560416916251|2408.644531858096|0.4999208935073339|1.274449013194616|1.764835592035

### 3.4 Análisis de Valores Nulos

In [6]:
null_counts = df_abandono.select([
    count(when(col(c).isNull(), c)).alias(c)
    for c in df_abandono.columns
])
null_counts.show()

+-----+---+--------------+---------------+-----+---------+------------+--------+-------+-----+
|Names|Age|Total_Purchase|Account_Manager|Years|Num_Sites|Onboard_date|Location|Company|Churn|
+-----+---+--------------+---------------+-----+---------+------------+--------+-------+-----+
|    0|  0|             0|              0|    0|        0|           0|       0|      0|    0|
+-----+---+--------------+---------------+-----+---------+------------+--------+-------+-----+



### 3.5 Distribución de la Variable Objetivo (Churn)

In [7]:
churn_dist = df_abandono.groupBy("Churn").count().orderBy("Churn")
churn_dist.show()

churn_stats = churn_dist.collect()
total = sum([row['count'] for row in churn_stats])
for row in churn_stats:
    percentage = (row['count'] / total) * 100
    label = "No Abandono" if row['Churn'] == 0 else "Abandono"
    print(f"{label} (Churn={row['Churn']}): {row['count']} ({percentage:.2f}%)")

churn_rate = churn_stats[1]['count'] / total * 100 if len(churn_stats) > 1 else 0
print(f"\nTasa de Abandono: {churn_rate:.2f}%")

+-----+-----+
|Churn|count|
+-----+-----+
|    0|  750|
|    1|  150|
+-----+-----+

No Abandono (Churn=0): 750 (83.33%)
Abandono (Churn=1): 150 (16.67%)

Tasa de Abandono: 16.67%


### 3.6 Estadísticas Detalladas por Columnas Numéricas

In [None]:
numeric_cols = ['Age', 'Total_Purchase', 'Years', 'Num_Sites']

print("=" * 80)
print("ESTADÍSTICAS DETALLADAS - COLUMNAS NUMÉRICAS")
print("=" * 80)

for col_name in numeric_cols:
    print(f"\n{col_name}:")
    print("-" * 40)
    
    stats = df_abandono.select(
        spark_min(col_name).alias("min"),
        spark_max(col_name).alias("max"),
        mean(col_name).alias("mean"),
        stddev(col_name).alias("stddev")
    ).collect()[0]
    
    print(f"  Mínimo:     {stats['min']}")
    print(f"  Máximo:     {stats['max']}")
    print(f"  Media:      {stats['mean']:.2f}" if stats['mean'] else "  Media:      None")
    print(f"  Desv. Est.: {stats['stddev']:.2f}" if stats['stddev'] else "  Desv. Est.: None")

ESTADÍSTICAS DETALLADAS - COLUMNAS NUMÉRICAS

Age:
----------------------------------------
  Mínimo:     22.0
  Máximo:     65.0
  Media:      41.82
  Desv. Est.: 6.13

Total_Purchase:
----------------------------------------
  Mínimo:     100.0
  Máximo:     18026.01
  Media:      10062.82
  Desv. Est.: 2408.64

Years:
----------------------------------------
  Mínimo:     1.0
  Máximo:     9.15
  Media:      5.27
  Desv. Est.: 1.27

Num_Sites:
----------------------------------------
  Mínimo:     3.0
  Máximo:     14.0
  Media:      8.59
  Desv. Est.: 1.76


### 3.7 Distribución de Account Manager

In [9]:
df_abandono.groupBy("Account_Manager").count().orderBy("Account_Manager").show()

am_stats = df_abandono.groupBy("Account_Manager").count().collect()
total = sum([row['count'] for row in am_stats])
for row in am_stats:
    percentage = (row['count'] / total) * 100
    label = "Con Account Manager" if row['Account_Manager'] == 1 else "Sin Account Manager"
    print(f"{label}: {row['count']} ({percentage:.2f}%)")

+---------------+-----+
|Account_Manager|count|
+---------------+-----+
|              0|  467|
|              1|  433|
+---------------+-----+

Con Account Manager: 433 (48.11%)
Sin Account Manager: 467 (51.89%)


## 4. Análisis Exploratorio de Datos (EDA) - Clientes Nuevos

### 4.1 Estructura del Dataset

In [10]:
df_nuevos.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)



### 4.2 Primeras Filas y Estadísticas

In [11]:
df_nuevos.show(10, truncate=True)
df_nuevos.describe().show()

+--------------+----+--------------+---------------+-----+---------+-------------------+--------------------+----------------+
|         Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|       Onboard_date|            Location|         Company|
+--------------+----+--------------+---------------+-----+---------+-------------------+--------------------+----------------+
| Andrew Mccall|37.0|       9935.53|              1| 7.71|      8.0|2011-08-29 18:37:54|38612 Johnny Stra...|        King Ltd|
|Michele Wright|23.0|       7526.94|              1| 9.28|     15.0|2013-07-22 18:19:54|21083 Nicole Junc...|   Cannon-Benson|
|  Jeremy Chang|65.0|         100.0|              1|  1.0|     15.0|2006-12-11 07:48:13|085 Austin Views ...|Barron-Robertson|
|Megan Ferguson|32.0|        6487.5|              0|  9.4|     14.0|2016-10-28 05:32:13|922 Wright Branch...|   Sexton-Golden|
|  Taylor Young|32.0|      13147.71|              1| 10.0|      8.0|2012-03-20 00:36:46|Unit 0789 Box 073...|  

### 4.3 Análisis de Valores Nulos

In [12]:
null_counts_nuevos = df_nuevos.select([
    count(when(col(c).isNull(), c)).alias(c)
    for c in df_nuevos.columns
])
null_counts_nuevos.show()

+-----+---+--------------+---------------+-----+---------+------------+--------+-------+
|Names|Age|Total_Purchase|Account_Manager|Years|Num_Sites|Onboard_date|Location|Company|
+-----+---+--------------+---------------+-----+---------+------------+--------+-------+
|    0|  0|             0|              0|    0|        0|           0|       0|      0|
+-----+---+--------------+---------------+-----+---------+------------+--------+-------+



## 5. Identificación de Problemas de Calidad de Datos

### 5.1 Verificación de Duplicados

In [13]:
abandono_total = df_abandono.count()
abandono_distinct = df_abandono.distinct().count()
print(f"Abandono Clientes - Duplicados: {abandono_total - abandono_distinct}")

nuevos_total = df_nuevos.count()
nuevos_distinct = df_nuevos.distinct().count()
print(f"Clientes Nuevos - Duplicados: {nuevos_total - nuevos_distinct}")

Abandono Clientes - Duplicados: 0
Clientes Nuevos - Duplicados: 0


### 5.2 Verificación de Valores Inválidos (Negativos)

In [14]:
print("Abandono Clientes - Valores Negativos:")
for col_name in numeric_cols:
    negative_count = df_abandono.filter(col(col_name) < 0).count()
    print(f"  {col_name}: {negative_count}")

print("\nClientes Nuevos - Valores Negativos:")
for col_name in numeric_cols:
    negative_count = df_nuevos.filter(col(col_name) < 0).count()
    print(f"  {col_name}: {negative_count}")

Abandono Clientes - Valores Negativos:
  Age: 0
  Total_Purchase: 0
  Years: 0
  Num_Sites: 0

Clientes Nuevos - Valores Negativos:
  Age: 0
  Total_Purchase: 0
  Years: 0
  Num_Sites: 0


### 5.3 Análisis de Edades No Realistas

In [15]:
unrealistic_ages_abandono = df_abandono.filter((col("Age") < 18) | (col("Age") > 100))
print(f"Abandono Clientes - Edades fuera de rango (< 18 o > 100): {unrealistic_ages_abandono.count()}")

unrealistic_ages_nuevos = df_nuevos.filter((col("Age") < 18) | (col("Age") > 100))
print(f"Clientes Nuevos - Edades fuera de rango (< 18 o > 100): {unrealistic_ages_nuevos.count()}")

Abandono Clientes - Edades fuera de rango (< 18 o > 100): 0
Clientes Nuevos - Edades fuera de rango (< 18 o > 100): 0


### 5.4 Análisis de Compras Muy Bajas o Cero

In [16]:
low_purchase_abandono = df_abandono.filter(col("Total_Purchase") < 1000).count()
print(f"Abandono Clientes con Total_Purchase < 1000: {low_purchase_abandono}")

low_purchase_nuevos = df_nuevos.filter(col("Total_Purchase") < 1000).count()
print(f"Clientes Nuevos con Total_Purchase < 1000: {low_purchase_nuevos}")

df_abandono.filter(col("Total_Purchase") < 1000).select("Names", "Total_Purchase", "Churn").show()

Abandono Clientes con Total_Purchase < 1000: 1
Clientes Nuevos con Total_Purchase < 1000: 1
+------------+--------------+-----+
|       Names|Total_Purchase|Churn|
+------------+--------------+-----+
|Kayla Reeves|         100.0|    0|
+------------+--------------+-----+



## 6. Limpieza de Datos (Data Cleansing)

### 6.1 Función de Limpieza

In [17]:
def clean_dataset(df, dataset_name):
    print(f"Limpiando dataset: {dataset_name}")
    original_count = df.count()
    
    df_clean = df.dropDuplicates()
    
    numeric_cols_to_clean = ['Age', 'Total_Purchase', 'Years', 'Num_Sites']
    
    for col_name in numeric_cols_to_clean:
        null_count = df_clean.filter(col(col_name).isNull()).count()
        if null_count > 0:
            mean_value = df_clean.select(mean(col(col_name))).collect()[0][0]
            if mean_value is not None:
                df_clean = df_clean.fillna({col_name: mean_value})
            else:
                df_clean = df_clean.filter(col(col_name).isNotNull())
    
    if 'Account_Manager' in df_clean.columns:
        null_count = df_clean.filter(col("Account_Manager").isNull()).count()
        if null_count > 0:
            mode_row = df_clean.groupBy("Account_Manager").count().orderBy(col("count").desc()).first()
            if mode_row and mode_row['Account_Manager'] is not None:
                mode_value = mode_row['Account_Manager']
                df_clean = df_clean.fillna({"Account_Manager": mode_value})
            else:
                df_clean = df_clean.filter(col("Account_Manager").isNotNull())
    
    string_cols = ['Names', 'Location', 'Company']
    for col_name in string_cols:
        if col_name in df_clean.columns:
            null_count = df_clean.filter(col(col_name).isNull()).count()
            if null_count > 0:
                percentage = (null_count / df_clean.count()) * 100
                if percentage < 5:
                    df_clean = df_clean.filter(col(col_name).isNotNull())
                else:
                    df_clean = df_clean.fillna({col_name: "Unknown"})
    
    for col_name in numeric_cols_to_clean:
        negative_count = df_clean.filter(col(col_name) < 0).count()
        if negative_count > 0:
            df_clean = df_clean.filter(col(col_name) >= 0)
    
    df_clean = df_clean.withColumn("Age", col("Age").cast(DoubleType())) \
                       .withColumn("Total_Purchase", col("Total_Purchase").cast(DoubleType())) \
                       .withColumn("Account_Manager", col("Account_Manager").cast(IntegerType())) \
                       .withColumn("Years", col("Years").cast(DoubleType())) \
                       .withColumn("Num_Sites", col("Num_Sites").cast(DoubleType()))
    
    if 'Churn' in df_clean.columns:
        df_clean = df_clean.withColumn("Churn", col("Churn").cast(IntegerType()))
    
    final_count = df_clean.count()
    print(f"Filas procesadas: {original_count} -> {final_count} (eliminadas: {original_count - final_count})")
    
    return df_clean

### 6.2 Aplicar Limpieza a Ambos Datasets

In [18]:
df_abandono_clean = clean_dataset(df_abandono, "ABANDONO CLIENTES")
df_nuevos_clean = clean_dataset(df_nuevos, "CLIENTES NUEVOS")

Limpiando dataset: ABANDONO CLIENTES
Filas procesadas: 900 -> 900 (eliminadas: 0)
Limpiando dataset: CLIENTES NUEVOS
Filas procesadas: 6 -> 6 (eliminadas: 0)


## 7. Verificación de Datos Limpios

### 7.1 Verificar Abandono Clientes Limpio

In [19]:
print(f"Total de filas: {df_abandono_clean.count()}")

df_abandono_clean.printSchema()

null_check = df_abandono_clean.select([
    count(when(col(c).isNull(), c)).alias(c)
    for c in df_abandono_clean.columns
])
null_check.show()

df_abandono_clean.describe().show()
df_abandono_clean.show(5, truncate=True)

Total de filas: 900
root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)

+-----+---+--------------+---------------+-----+---------+------------+--------+-------+-----+
|Names|Age|Total_Purchase|Account_Manager|Years|Num_Sites|Onboard_date|Location|Company|Churn|
+-----+---+--------------+---------------+-----+---------+------------+--------+-------+-----+
|    0|  0|             0|              0|    0|        0|           0|       0|      0|    0|
+-----+---+--------------+---------------+-----+---------+------------+--------+-------+-----+

+-------+-------------+-----------------+-----------------+------------------+------------

### 7.2 Verificar Clientes Nuevos Limpio

In [20]:
print(f"Total de filas: {df_nuevos_clean.count()}")

df_nuevos_clean.printSchema()

null_check_nuevos = df_nuevos_clean.select([
    count(when(col(c).isNull(), c)).alias(c)
    for c in df_nuevos_clean.columns
])
null_check_nuevos.show()

df_nuevos_clean.describe().show()
df_nuevos_clean.show(5, truncate=True)

Total de filas: 6
root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)

+-----+---+--------------+---------------+-----+---------+------------+--------+-------+
|Names|Age|Total_Purchase|Account_Manager|Years|Num_Sites|Onboard_date|Location|Company|
+-----+---+--------------+---------------+-----+---------+------------+--------+-------+
|    0|  0|             0|              0|    0|        0|           0|       0|      0|
+-----+---+--------------+---------------+-----+---------+------------+--------+-------+

+-------+-------------+------------------+-----------------+------------------+------------------+------------------+--------------------+----------------+
|sum

## 8. Guardar Datos Limpios

In [21]:
output_abandono = "abandono_clientes_clean.csv"
output_nuevos = "clientes_nuevos_clean.csv"

df_abandono_clean.coalesce(1).write.csv(output_abandono, header=True, mode='overwrite')
df_nuevos_clean.coalesce(1).write.csv(output_nuevos, header=True, mode='overwrite')

print(f"Datasets guardados en {output_abandono} y {output_nuevos}")

Datasets guardados en abandono_clientes_clean.csv y clientes_nuevos_clean.csv


## 9. Resumen del Análisis

### Hallazgos Principales del EDA:

**Dataset Abandono Clientes:**
- Total: 900 registros de clientes históricos
- Variable objetivo: Churn (abandono)
  - No Abandono (0): 750 clientes (83.33%)
  - Abandono (1): 150 clientes (16.67%)
- Distribución Account Manager: 48.11% con AM, 51.89% sin AM

**Dataset Clientes Nuevos:**
- Total: 6 registros sin variable objetivo (para predicción)

### Calidad de los Datos:

**Datos originales en excelente estado:**
- Sin valores nulos en ninguna columna
- Sin duplicados
- Sin valores negativos
- Todas las edades en rango válido (22-65 años)
- 1 cliente con compra muy baja ($100) en cada dataset

**Características Numéricas (Abandono Clientes):**
- Age: rango 22-65 años, media 41.82 años
- Total_Purchase: rango $100-$18,026, media $10,062
- Years: rango 1.0-9.15 años, media 5.27 años
- Num_Sites: rango 3-14 sitios, media 8.59 sitios

### Proceso de Limpieza:

La función clean_dataset() implementa 6 pasos:
1. Eliminación de duplicados
2. Imputación de nulos numéricos con media
3. Imputación de nulos categóricos con moda
4. Manejo de nulos en texto (eliminar o imputar según porcentaje)
5. Eliminación de valores negativos
6. Conversión a tipos de datos correctos

**Resultado:** Ambos datasets estaban limpios, 0 filas eliminadas (0.00%)

### Próximos Pasos:

Los datos están listos para:
- Feature engineering (extracción de características de fecha, creación de features derivadas)
- Normalización/estandarización de variables numéricas
- Encoding de variables categóricas
- Modelado predictivo con PySpark MLlib (Regresión Logística, Random Forest, Gradient Boosting)
- Predicción de abandono en clientes nuevos