In [0]:
dbutils.fs.ls("/Volumes/workspace/default/raw_data")

### **ETL datos de clientes y sus suscripciones a una plataforma digital.**
El objetivo es generar métricas limpias y confiables para análisis financiero, construyendo un dataset agregado con:
- país
- tipo_suscripcion
- ingreso_promedio_anual
- total_clientes

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, count


# Spark Session
spark = (
    SparkSession.builder
    .appName("digital_platform")
    .getOrCreate()
)


# Carga de datos
df_clientes = (
    spark.read.csv(
        "/Volumes/workspace/default/raw_data/clientes_proyecto.csv",
        header=True,
        inferSchema=True
    )
)

df_suscripciones = (
    spark.read.csv(
        "/Volumes/workspace/default/raw_data/suscripciones_proyecto.csv",
        header=True,
        inferSchema=True
    )
)


# Unión de datasets
df = (
    df_clientes
    .join(
        df_suscripciones,
        on="cliente_id",
        how="inner"
    )
)

df.show(5)


# Verificación inicial de valores nulos
for columna in df.columns:
    print(
        f"Nulos en {columna}: ",
        df.filter(col(columna).isNull()).count()
    )


# Limpieza e imputación de datos
print('\nValores nulos en columnas tipo_suscripcion y estado_suscripcion deben ser eliminados.')
print('Imputamos columnas de edad y pago_mensual con el valor medio.')
print('Imputamos columna pais con la palabra: "Desconocido".')

df = df.dropna(
    subset=["tipo_suscripcion", "estado_suscripcion"]
)

promedio = (
    df
    .select(
        avg(col("edad")).alias("promedio_edad"),
        avg(col("pago_mensual")).alias("promedio_pago")
    )
    .first()
)

df = df.fillna({
    "edad": promedio["promedio_edad"],
    "pago_mensual": promedio["promedio_pago"],
    "pais": "Desconocido"
})


# Segunda verificación de valores nulos
print('\nSegunda verificación de nulos:')
for columna in df.columns:
    print(
        f"Nulos en {columna}: ",
        df.filter(col(columna).isNull()).count()
    )


# Feature engineering y filtrado
df = df.withColumn(
    "ingreso_anual",
    col("pago_mensual") * 12
)

df_an = df.filter(
    (col("edad") >= 21) &
    (col("pago_mensual") > 0) &
    (col("estado_suscripcion") == "Activa")
)


# Agregaciones y análisis final
df_an = (
    df_an
    .groupBy("pais", "tipo_suscripcion")
    .agg(
        avg(col("ingreso_anual")).alias("ingreso_promedio_anual"),
        count(col("cliente_id")).alias("total_clientes")
    )
)

df_an = df_an.filter(
    (col("ingreso_promedio_anual") > 500) &
    (col("total_clientes") >= 10)
)

df_an = df_an.orderBy(
    col("ingreso_promedio_anual").desc()
)

df_an.show(5)

