In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import asc, col, isnan, when, count, median, udf, concat, month, year, substring, lit
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import GBTClassifier, OneVsRest
from pyspark.ml import Pipeline
import os
import pyarrow

In [None]:
conf = SparkConf() \
    .setAppName("data_cleaning") \
    .set("spark.driver.memory", "15g")\
    .set("spark.executor.cores","8") \
    .set("spark.sql.execution.arrow.pyspark.enabled","true")
spark = SparkSession.builder.config(conf=conf).getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/21 07:39:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 53404)
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/local/lib/python3.9/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/usr/local/lib/python3.9/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/local/lib/python3.9/socketserver.py", line 747, in __init__
    self.handle()
  File "/usr/local/lib/python3.9/site-packages/pyspark/accumulators.py", line 295, in handle
    poll(accum_updates)
  File "/usr/local/lib/python3.9/site-packages/pyspark/accumulators.py", line 267, in poll
    if self.rfile in r and func():
  File "/usr/local/lib/python3.9/site-packages/pyspark/accumulators.py", line 271, in accum_updates
    num_up

In [4]:
csv_file_path ="/workspace/data.csv"
df = spark.read.csv(csv_file_path, header=True, inferSchema=True)
df.orderBy(asc("fecha_dato"))

                                                                                

DataFrame[fecha_dato: date, ncodpers: double, ind_empleado: string, pais_residencia: string, sexo: string, age: string, fecha_alta: date, ind_nuevo: string, antiguedad: string, indrel: string, ult_fec_cli_1t: date, indrel_1mes: string, tiprel_1mes: string, indresi: string, indext: string, conyuemp: string, canal_entrada: string, indfall: string, tipodom: string, cod_prov: string, nomprov: string, ind_actividad_cliente: string, renta: double, segmento: string, ind_ahor_fin_ult1: int, ind_aval_fin_ult1: int, ind_cco_fin_ult1: int, ind_cder_fin_ult1: int, ind_cno_fin_ult1: int, ind_ctju_fin_ult1: int, ind_ctma_fin_ult1: int, ind_ctop_fin_ult1: int, ind_ctpp_fin_ult1: int, ind_deco_fin_ult1: int, ind_deme_fin_ult1: int, ind_dela_fin_ult1: int, ind_ecue_fin_ult1: int, ind_fond_fin_ult1: int, ind_hip_fin_ult1: int, ind_plan_fin_ult1: int, ind_pres_fin_ult1: int, ind_reca_fin_ult1: int, ind_tjcr_fin_ult1: int, ind_valo_fin_ult1: int, ind_viv_fin_ult1: int, ind_nomina_ult1: string, ind_nom_pen

In [5]:
#Fill NA with overall mean
mean_age_18_to_30 = df.filter((F.col("age") >= 18) & (F.col("age") <= 30)).select(F.mean("age")).collect()[0][0]
mean_age_30_to_100 = df.filter((F.col("age") >= 30) & (F.col("age") <= 100)).select(F.mean("age")).collect()[0][0]
overall_mean_age = df.select(F.mean("age")).collect()[0][0]

df = df.withColumn(
    "age",
    F.when(F.col("age") < 18, mean_age_18_to_30)
     .when(F.col("age") > 100, mean_age_30_to_100)
     .otherwise(F.col("age"))
)

df = df.fillna({"age": overall_mean_age})
df = df.withColumn("age", F.col("age").cast("int"))

df = df.withColumn(
    "age",
    F.when(F.col("age").isNull(), overall_mean_age).otherwise(F.col("age"))
)

                                                                                

In [6]:
# #Filling missing falue with the most common status
df = df.fillna({"ind_nuevo": 1})
df = df.fillna({"indrel": 1})
df = df.fillna({"ind_nomina_ult1": 0})
df = df.fillna({"ind_nom_pens_ult1": 0})
df = df.fillna({"indfall": "N"})
df = df.fillna({"tiprel_1mes": "A"})

In [7]:
# List of columns to cast
columns_to_cast = [
    "ind_nuevo",
    "indrel",
    "ind_ahor_fin_ult1",
    "ind_aval_fin_ult1",
    "ind_cco_fin_ult1",
    "ind_cder_fin_ult1",
    "ind_cno_fin_ult1",
    "ind_ctju_fin_ult1",
    "ind_ctma_fin_ult1",
    "ind_ctop_fin_ult1",
    "ind_ctpp_fin_ult1",
    "ind_deco_fin_ult1",
    "ind_deme_fin_ult1",
    "ind_dela_fin_ult1",
    "ind_ecue_fin_ult1",
    "ind_fond_fin_ult1",
    "ind_hip_fin_ult1",
    "ind_plan_fin_ult1",
    "ind_pres_fin_ult1",
    "ind_reca_fin_ult1",
    "ind_tjcr_fin_ult1",
    "ind_valo_fin_ult1",
    "ind_viv_fin_ult1",
    "ind_nomina_ult1",
    "ind_nom_pens_ult1",
    "ind_recibo_ult1",
    "renta"
]

# Cast each column to IntegerType
for column in columns_to_cast:
    df = df.withColumn(column, df[column].cast(IntegerType()))

In [8]:
window_spec = Window.orderBy(F.col("fecha_alta"))
dates = df.select(
    "fecha_alta",
    F.row_number().over(window_spec).alias("index")
)
total_rows = dates.count()
median_index = (total_rows // 2) + 1 
median_value = dates.filter(F.col("index") == median_index).select("fecha_alta").collect()[0][0]
df = df.withColumn(
    "fecha_alta",
    F.when(F.col("fecha_alta").isNull(), median_value).otherwise(F.col("fecha_alta"))
)

25/02/21 07:40:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/02/21 07:40:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/02/21 07:40:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/02/21 07:41:03 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/02/21 07:41:03 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

In [9]:
#Replace null values with median
median_value = df.select(F.median("ind_actividad_cliente")).collect()[0][0]

df = df.withColumn(
    "ind_actividad_cliente",
    F.when(F.col("ind_actividad_cliente").isNull(), median_value).otherwise(F.col("ind_actividad_cliente"))
)

In [10]:
df = df.na.drop(subset=["nomprov"]) 

grouped = df.groupBy("nomprov").agg(median("renta").alias("renta_median"))

df = df.join(grouped, "nomprov", "left")

df = df.withColumn(
    "renta",
    F.when(F.col("renta").isNull(), col("renta_median")).otherwise(F.col("renta"))
)

In [11]:
df = df.withColumn(
    "conyuemp",
    F.when(F.col("conyuemp").isNull(), -1).otherwise(F.col("conyuemp"))
)

In [12]:
# Dict_Null = {col:df.filter(df[col].isNull()).count() for col in df.columns}
# Dict_Null

In [13]:
canal_dict = {'KAI': 35,'KBG': 17,'KGU': 149,'KDE': 47,'KAJ': 41,'KCG': 59,
 'KHM': 12,'KAL': 74,'KFH': 140,'KCT': 112,'KBJ': 133,'KBL': 88,'KHQ': 157,'KFB': 146,'KFV': 48,'KFC': 4,
 'KCK': 52,'KAN': 110,'KES': 68,'KCB': 78,'KBS': 118,'KDP': 103,'KDD': 113,'KBX': 116,'KCM': 82,
 'KAE': 30,'KAB': 28,'KFG': 27,'KDA': 63,'KBV': 100,'KBD': 109,'KBW': 114,'KGN': 11,
 'KCP': 129,'KAK': 51,'KAR': 32,'KHK': 10,'KDS': 124,'KEY': 93,'KFU': 36,'KBY': 111,
 'KEK': 145,'KCX': 120,'KDQ': 80,'K00': 50,'KCC': 29,'KCN': 81,'KDZ': 99,'KDR': 56,
 'KBE': 119,'KFN': 42,'KEC': 66,'KDM': 130,'KBP': 121,'KAU': 142,'KDU': 79,
 'KCH': 84,'KHF': 19,'KCR': 153,'KBH': 90,'KEA': 89,'KEM': 155,'KGY': 44,'KBM': 135,
 'KEW': 98,'KDB': 117,'KHD': 2,'RED': 8,'KBN': 122,'KDY': 61,'KDI': 150,'KEU': 72,
 'KCA': 73,'KAH': 31,'KAO': 94,'KAZ': 7,'004': 83,'KEJ': 95,'KBQ': 62,'KEZ': 108,
 'KCI': 65,'KGW': 147,'KFJ': 33,'KCF': 105,'KFT': 92,'KED': 143,'KAT': 5,'KDL': 158,
 'KFA': 3,'KCO': 104,'KEO': 96,'KBZ': 67,'KHA': 22,'KDX': 69,'KDO': 60,'KAF': 23,'KAW': 76,
 'KAG': 26,'KAM': 107,'KEL': 125,'KEH': 15,'KAQ': 37,'KFD': 25,'KEQ': 138,'KEN': 137,
 'KFS': 38,'KBB': 131,'KCE': 86,'KAP': 46,'KAC': 57,'KBO': 64,'KHR': 161,'KFF': 45,
 'KEE': 152,'KHL': 0,'007': 71,'KDG': 126,'025': 159,'KGX': 24,'KEI': 97,'KBF': 102,
 'KEG': 136,'KFP': 40,'KDF': 127,'KCJ': 156,'KFR': 144,'KDW': 132,-1: 6,'KAD': 16,
 'KBU': 55,'KCU': 115,'KAA': 39,'KEF': 128,'KAY': 54,'KGC': 18,'KAV': 139,'KDN': 151,
 'KCV': 106,'KCL': 53,'013': 49,'KDV': 91,'KFE': 148,'KCQ': 154,'KDH': 14,'KHN': 21,
 'KDT': 58,'KBR': 101,'KEB': 123,'KAS': 70,'KCD': 85,'KFL': 34,'KCS': 77,'KHO': 13,
 'KEV': 87,'KHE': 1,'KHC': 9,'KFK': 20,'KDC': 75,'KFM': 141,'KHP': 160,'KHS': 162,
 'KFI': 134,'KGV': 43}


pais_dict = {'LV': 102,'CA': 2,'GB': 9,'EC': 19,'BY': 64,'ML': 104,'MT': 118,
 'LU': 59,'GR': 39,'NI': 33,'BZ': 113,'QA': 58,'DE': 10,'AU': 63,'IN': 31,
 'GN': 98,'KE': 65,'HN': 22,'JM': 116,'SV': 53,'TH': 79,'IE': 5,'TN': 85,
 'PH': 91,'ET': 54,'AR': 13,'KR': 87,'GA': 45,'FR': 8,'SG': 66,'LB': 81,
 'MA': 38,'NZ': 93,'SK': 69,'CN': 28,'GI': 96,'PY': 51,'SA': 56,'PL': 30,
 'PE': 20,'GE': 78,'HR': 67,'CD': 112,'MM': 94,'MR': 48,'NG': 83,'HU': 106,
 'AO': 71,'NL': 7,'GM': 110,'DJ': 115,'ZA': 75,'OM': 100,'LT': 103,'MZ': 27,
 'VE': 14,'EE': 52,'CF': 109,'CL': 4,'SL': 97,'DO': 11,'PT': 26,'ES': 0,
 'CZ': 36,'AD': 35,'RO': 41,'TW': 29,'BA': 61,'IS': 107,'AT': 6,'ZW': 114,
 'TR': 70,'CO': 21,'PK': 84,'SE': 24,'AL': 25,'CU': 72,'UY': 77,'EG': 74,'CR': 32,
 'GQ': 73,'MK': 105,'KW': 92,'GT': 44,'CM': 55,'SN': 47,'KZ': 111,'DK': 76,
 'LY': 108,'AE': 37,'PA': 60,'UA': 49,'GW': 99,'TG': 86,'MX': 16,'KH': 95,
 'FI': 23,'NO': 46,'IT': 18,'GH': 88, 'JP': 82,'RU': 43,'PR': 40,'RS': 89,
 'DZ': 80,'MD': 68,-1: 1,'BG': 50,'CI': 57,'IL': 42,'VN': 90,'CH': 3,'US': 15,'HK': 34,
 'CG': 101,'BO': 62,'BR': 17,'BE': 12,'BM': 117}

emp_dict = {'N':0,-1:-1,'A':1,'B':2,'F':3,'S':4}
indfall_dict = {'N':0,-1:-1,'S':1}
sexo_dict = {'V':0,'H':1,-1:-1}
tiprel_dict = {'A':0,-1:-1,'I':1,'P':2,'N':3,'R':4}
indresi_dict = {'N':0,-1:-1,'S':1}
indext_dict = {'N':0,-1:-1,'S':1}
conyuemp_dict = {'N':0,-1:-1,'S':1}
segmento_dict = {-1:-1,'01 - TOP':1,'02 - PARTICULARES':2,'03 - UNIVERSITARIO':3}

In [14]:
# def map_emp_status(status):
#     return emp_dict.get(status, -1)  # Use .get() to handle missing keys

# # Create a UDF from the Python function
# emp_udf = udf(map_emp_status, IntegerType())

# # Apply the UDF to the DataFrame
# spark_df = df.withColumn("ind_empleado", emp_udf(col("ind_empleado")))

In [15]:
spark_df = df

In [16]:
def create_mapping_udf(mapping_dict):
    def map_value(value):
        return mapping_dict.get(value, -1)  # Default to -1 if not found
    return udf(map_value, IntegerType())

canal_udf = create_mapping_udf(canal_dict)
pais_udf = create_mapping_udf(pais_dict)
indfall_udf = create_mapping_udf(indfall_dict)
sexo_udf = create_mapping_udf(sexo_dict)
tiprel_udf = create_mapping_udf(tiprel_dict)
indresi_udf = create_mapping_udf(indresi_dict)
indext_udf = create_mapping_udf(indext_dict)
conyuemp_udf = create_mapping_udf(conyuemp_dict)
segmento_udf = create_mapping_udf(segmento_dict)
emp_udf = create_mapping_udf(emp_dict)

def apply_udfs(spark_df):
    spark_df = spark_df.withColumn("canal_entrada", canal_udf(col("canal_entrada")).cast(IntegerType()))
    spark_df = spark_df.withColumn("pais_residencia", pais_udf(col("pais_residencia")).cast(IntegerType()))
    spark_df = spark_df.withColumn("indfall", indfall_udf(col("indfall")).cast(IntegerType()))
    spark_df = spark_df.withColumn("sexo", sexo_udf(col("sexo")).cast(IntegerType()))
    spark_df = spark_df.withColumn("tiprel_1mes", tiprel_udf(col("tiprel_1mes")).cast(IntegerType()))
    spark_df = spark_df.withColumn("indresi", indresi_udf(col("indresi")).cast(IntegerType()))
    spark_df = spark_df.withColumn("indext", indext_udf(col("indext")).cast(IntegerType()))
    spark_df = spark_df.withColumn("conyuemp", conyuemp_udf(col("conyuemp")).cast(IntegerType()))
    spark_df = spark_df.withColumn("segmento", segmento_udf(col("segmento")).cast(IntegerType()))
    spark_df = spark_df.withColumn("ind_empleado", segmento_udf(col("ind_empleado")).cast(IntegerType()))
    
    return spark_df


In [17]:
spark_df = apply_udfs(spark_df)

In [18]:
spark_df = spark_df.fillna({"ult_fec_cli_1t" : '2020-01-01'})

In [19]:
spark_df = spark_df.withColumn("fecha_dato_month", substring("fecha_dato", 6, 2).cast(IntegerType()))
spark_df = spark_df.withColumn("fecha_dato_year", (substring("fecha_dato", 1, 4).cast(IntegerType()) - 2015))
spark_df = spark_df.withColumn("month_int", (col("fecha_dato_month") + 12 * col("fecha_dato_year")).cast(IntegerType()))
spark_df = spark_df.withColumn("fecha_dato_day", substring("fecha_dato", 9, 2).cast(IntegerType()))
for col_name in ["fecha_dato_month", "fecha_dato_year", "month_int", "fecha_dato_day"]:
    spark_df = spark_df.withColumn(col_name, \
                        when(col(col_name).isNull(), lit(-1)) \
                        .otherwise(col(col_name)))

# Drop the original column
spark_df = spark_df.drop("fecha_dato")

In [20]:
spark_df = spark_df.withColumn("ult_fec_cli_1t_month", substring("ult_fec_cli_1t", 6, 2).cast(IntegerType()))
spark_df = spark_df.withColumn("ult_fec_cli_1t_year", (substring("ult_fec_cli_1t", 1, 4).cast(IntegerType()) - 2015))
spark_df = spark_df.withColumn("ult_fec_cli_1t_day", substring("ult_fec_cli_1t", 9, 2).cast(IntegerType()))
spark_df = spark_df.withColumn("ult_fec_cli_1t_month_int", (col("ult_fec_cli_1t_month") + 12 * col("ult_fec_cli_1t_year")))
    #Check if any value is null, then fill with -1
for col_name in ["ult_fec_cli_1t_month", "ult_fec_cli_1t_year", "ult_fec_cli_1t_day", "ult_fec_cli_1t_month_int"]:
    spark_df = spark_df.withColumn(col_name, \
                       when(col(col_name).isNull(), -1) \
                       .otherwise(col(col_name)))
spark_df = spark_df.drop("ult_fec_cli_1t")

In [21]:
spark_df = spark_df.withColumn("fecha_alta_month", substring("fecha_alta", 6, 2).cast(IntegerType()))
spark_df = spark_df.withColumn("fecha_alta_year", (substring("fecha_alta", 1, 4).cast(IntegerType()) - 1995))
spark_df = spark_df.withColumn("fecha_alta_day", substring("fecha_alta", 9, 2).cast(IntegerType()))
spark_df = spark_df.withColumn("fecha_alta_month_int", (col("fecha_alta_month") + 12 * col("fecha_alta_year")).cast(IntegerType()))
spark_df = spark_df.withColumn("fecha_alta_day_int", (col("fecha_alta_day") + 30 * col("fecha_alta_month") + 365 * col("fecha_alta_year")).cast(IntegerType()))

# Drop the original column
spark_df = spark_df.drop("fecha_alta")

In [22]:
for col_name in ["fecha_dato_month", "fecha_dato_year", "month_int"]:
    spark_df = spark_df.withColumn(col_name, \
                        when(col(col_name).isNull(), -1) \
                        .otherwise(col(col_name)))

In [23]:
spark_df = spark_df.fillna({"ind_nomina_ult1": -1})
spark_df = spark_df.fillna({"ind_nom_pens_ult1": -1})

In [24]:
spark_df = spark_df.withColumn("indrel_1mes",when(col("indrel_1mes") == "P", -2).otherwise(col("indrel_1mes")))
spark_df = spark_df.fillna({"indrel_1mes": -1})
spark_df = spark_df.withColumn("ind_actividad_cliente", when(col("ind_actividad_cliente").isNull(), lit(-1)).otherwise(col("ind_actividad_cliente").cast(IntegerType())))
spark_df = spark_df.withColumn("indrel_1mes", col("indrel_1mes").cast(IntegerType()))

In [25]:
# #Filling missing falue with the most common status
spark_df = spark_df.fillna({"ind_nuevo": 1})
spark_df = spark_df.fillna({"indrel": 1})
spark_df = spark_df.fillna({"ind_nomina_ult1": 0})
spark_df = spark_df.fillna({"ind_nom_pens_ult1": 0})
spark_df = spark_df.fillna({"indfall": "N"})
spark_df = spark_df.fillna({"tiprel_1mes": "A"})

In [26]:
spark_df = spark_df.withColumn("tipodom", when(col("tipodom").isNull(), lit(-1)).otherwise(col("tipodom").cast(IntegerType())))
spark_df = spark_df.withColumn("cod_prov", when(col("cod_prov").isNull(), lit(-1)).otherwise(col("cod_prov").cast(IntegerType())))
spark_df = spark_df.withColumn("antiguedad",when(col("antiguedad").isNull(), lit(-1)).otherwise(col("antiguedad")))
spark_df = spark_df.withColumn("antiguedad", col("antiguedad").cast(IntegerType()))
spark_df = spark_df.drop("nomprov")

In [27]:
# Dict_Null = {col:spark_df.filter(spark_df[col] == -1).count() for col in spark_df.columns}
# Dict_Null

In [28]:
# Count number of NULLs
# age_null = {df.filter(df['age'].isNull()).count()}
# age_null

In [29]:
age_null = {spark_df.filter(spark_df['sexo'] == -1 ).count()}
age_null

                                                                                

{70}

In [30]:
target_cols = [ "ind_ahor_fin_ult1",
    "ind_aval_fin_ult1",
    "ind_cco_fin_ult1",
    "ind_cder_fin_ult1",
    "ind_cno_fin_ult1",
    "ind_ctju_fin_ult1",
    "ind_ctma_fin_ult1",
    "ind_ctop_fin_ult1",
    "ind_ctpp_fin_ult1",
    "ind_deco_fin_ult1",
    "ind_deme_fin_ult1",
    "ind_dela_fin_ult1",
    "ind_ecue_fin_ult1",
    "ind_fond_fin_ult1",
    "ind_hip_fin_ult1",
    "ind_plan_fin_ult1",
    "ind_pres_fin_ult1",
    "ind_reca_fin_ult1",
    "ind_tjcr_fin_ult1",
    "ind_valo_fin_ult1",
    "ind_viv_fin_ult1",
    "ind_nomina_ult1",
    "ind_nom_pens_ult1",
    "ind_recibo_ult1"]

w = Window.partitionBy("ncodpers").orderBy("month_int")

lag_months =[1,2,3]
for lag in lag_months:
    for col in target_cols:
        spark_df = spark_df.withColumn(f"lag_{lag}_{col}", F.lag(F.col(col),lag).over(w))

In [31]:
spark_df.printSchema()

root
 |-- ncodpers: double (nullable = true)
 |-- ind_empleado: integer (nullable = true)
 |-- pais_residencia: integer (nullable = true)
 |-- sexo: integer (nullable = true)
 |-- age: double (nullable = true)
 |-- ind_nuevo: integer (nullable = false)
 |-- antiguedad: integer (nullable = true)
 |-- indrel: integer (nullable = false)
 |-- indrel_1mes: integer (nullable = true)
 |-- tiprel_1mes: integer (nullable = true)
 |-- indresi: integer (nullable = true)
 |-- indext: integer (nullable = true)
 |-- conyuemp: integer (nullable = true)
 |-- canal_entrada: integer (nullable = true)
 |-- indfall: integer (nullable = true)
 |-- tipodom: integer (nullable = true)
 |-- cod_prov: integer (nullable = true)
 |-- ind_actividad_cliente: integer (nullable = true)
 |-- renta: double (nullable = true)
 |-- segmento: integer (nullable = true)
 |-- ind_ahor_fin_ult1: integer (nullable = true)
 |-- ind_aval_fin_ult1: integer (nullable = true)
 |-- ind_cco_fin_ult1: integer (nullable = true)
 |-- ind

In [32]:
spark_df.write.parquet("train_test.parquet")

25/02/21 07:42:15 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [34]:
# Python
print(", ".join(f'"{col}"' for col in spark_df.columns))

"ncodpers", "ind_empleado", "pais_residencia", "sexo", "age", "ind_nuevo", "antiguedad", "indrel", "indrel_1mes", "tiprel_1mes", "indresi", "indext", "conyuemp", "canal_entrada", "indfall", "tipodom", "cod_prov", "ind_actividad_cliente", "renta", "segmento", "ind_ahor_fin_ult1", "ind_aval_fin_ult1", "ind_cco_fin_ult1", "ind_cder_fin_ult1", "ind_cno_fin_ult1", "ind_ctju_fin_ult1", "ind_ctma_fin_ult1", "ind_ctop_fin_ult1", "ind_ctpp_fin_ult1", "ind_deco_fin_ult1", "ind_deme_fin_ult1", "ind_dela_fin_ult1", "ind_ecue_fin_ult1", "ind_fond_fin_ult1", "ind_hip_fin_ult1", "ind_plan_fin_ult1", "ind_pres_fin_ult1", "ind_reca_fin_ult1", "ind_tjcr_fin_ult1", "ind_valo_fin_ult1", "ind_viv_fin_ult1", "ind_nomina_ult1", "ind_nom_pens_ult1", "ind_recibo_ult1", "renta_median", "fecha_dato_month", "fecha_dato_year", "month_int", "fecha_dato_day", "ult_fec_cli_1t_month", "ult_fec_cli_1t_year", "ult_fec_cli_1t_day", "ult_fec_cli_1t_month_int", "fecha_alta_month", "fecha_alta_year", "fecha_alta_day", "fech

In [1]:
# Define a window partitioned by customer and ordered by date
w = Window.partitionBy("ncodpers").orderBy("fecha_dato")

# Compute the previous month value to calculate the gap
spark_df = spark_df.withColumn("prev_month", F.lag("month_int").over(w))
spark_df = spark_df.withColumn("month_diff", 
                   F.when(F.col("prev_month").isNotNull(), 
                          F.col("month_int") - F.col("prev_month"))
                    .otherwise(0))

# Flag a gap if the difference is not equal to 1 (and ignore the first record)
spark_df = spark_df.withColumn("gap_flag", 
                   F.when((F.col("month_diff") != 1) & F.col("prev_month").isNotNull(), 1)
                    .otherwise(0))

NameError: name 'Window' is not defined