In [1]:
import warnings
warnings.simplefilter("ignore")

# Monografía - Dataton bancolombia 2020

In [2]:
from pyspark import SparkContext
from pyspark.sql.types import *
from pyspark.sql.functions import col, udf, when, lower
from pyspark.sql import SparkSession, SQLContext

spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = SparkContext.getOrCreate()
sqlCtx = SQLContext(sc)

21/10/28 22:51:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## Carga del dataset en formato parquet

En esta primera parte se cargara el dataset en formato parquet y se sacaran el total de registros

In [3]:
df = sqlCtx.read.parquet("tmp/dataton")
print(f"Total de los registros en bruto de la tabla: {df.count()}")

                                                                                

Total de los registros en bruto de la tabla: 20988748


## Limpieza de las variables categoricas

En esta sección se hará una limpieza sobre las variables categoricas a utilizar en la monografía

In [4]:
df.dtypes

[('periodo', 'int'),
 ('id_cli', 'bigint'),
 ('fecha_nacimiento', 'bigint'),
 ('edad', 'double'),
 ('genero', 'string'),
 ('estado_civil', 'string'),
 ('nivel_academico', 'string'),
 ('profesion', 'string'),
 ('ocupacion', 'string'),
 ('tipo_vivienda', 'string'),
 ('ult_actual', 'bigint'),
 ('categoria', 'smallint'),
 ('codigo_ciiu', 'bigint'),
 ('ind_mora_vigente', 'string'),
 ('cartera_castigada', 'string'),
 ('ciudad_residencia', 'string'),
 ('departamento_residencia', 'string'),
 ('ciudad_laboral', 'string'),
 ('departamento_laboral', 'string'),
 ('rechazo_credito', 'string'),
 ('mora_max', 'bigint'),
 ('cant_moras_30_ult_12_meses', 'bigint'),
 ('cant_moras_60_ult_12_meses', 'bigint'),
 ('cant_moras_90_ult_12_meses', 'bigint'),
 ('cupo_total_tc', 'double'),
 ('tenencia_tc', 'string'),
 ('cuota_tc_bancolombia', 'double'),
 ('tiene_consumo', 'string'),
 ('tiene_crediagil', 'string'),
 ('nro_tot_cuentas', 'bigint'),
 ('ctas_activas', 'bigint'),
 ('tiene_ctas_activas', 'string'),
 ('ct

Como primera instancia se eliminaran las columnas que desde ya no aportan información para entrenar un modelo, además se eliminaran ciertas variables que son dificiles de tratar al tener demasiadas categorias (profesion, departamento laboral y residencial)

In [5]:
clean_df = df.drop("periodo","id_cli","fecha_nacimiento","ult_actual","codigo_ciiu", "profesion",
                   "departamento_residencia", "departamento_laboral", "convenio_lib")

1. En la variable llamada rep_calif_cred (grupo de riesgo) se eliminaron los valores iguales a "SIN INFO" dado que no hay forma de imputar estos valores ya que es una calificación que da el banco al cliente.
2. En la variable llamada genero, se eliminaron los valores nulos.
3. En la variable edad se eliminaron los valores nulos ya que más adelante esta variable será importante para hacer una categorización.

In [None]:
clean_df = clean_df.filter(col("rep_calif_cred") != "SIN INFO")
clean_df = clean_df.filter(col("genero") != " ")

Aqui se realizan limpiezas generales sobre las variables y se convirtieron algunas variables categoricas en variables binarias para usarse en los modelos

In [None]:
clean_df = clean_df.select("*",
                       when((col("estado_civil") == "NO INFORMA") | (col("estado_civil") == "\\N"), "otro") \
                               .otherwise(lower(col("estado_civil"))).alias("civil_status"),
                       when(col("nivel_academico") == "SIN INFORMACION", "no informa") \
                               .otherwise(lower(col("nivel_academico"))).alias("academic_level"),
                       when(col("tipo_vivienda") == '\\N', "no informa") \
                               .otherwise(lower(col("tipo_vivienda"))).alias("house_kind"),
                       when((col("ind_mora_vigente") == '\\N') | (col("ind_mora_vigente") == "N"), 0) \
                               .otherwise(1).alias("pending_arrears"),
                       when((col("cartera_castigada") == '\\N') | (col("cartera_castigada") == "N"), 0) \
                               .otherwise(1).alias("punished_wallet"),
                       when((col("tenencia_tc") == 'NO'), 0) \
                               .otherwise(1).alias("have_tc"),
                       when((col("tiene_consumo") == '\\N'), 0) \
                               .otherwise(1).alias("have_consumption"),
                       when((col("tiene_crediagil") == '\\N'), 0) \
                               .otherwise(1).alias("have_crediagil"),
                       when((col("tiene_ctas_activas") == '\\N'), 0) \
                               .otherwise(1).alias("active_accounts"),
                       when((col("tiene_ctas_embargadas") == '\\N'), 0) \
                               .otherwise(1).alias("seized_accounts"),
                       when((col("pension_fopep") == '\\N'), 0) \
                               .otherwise(1).alias("fopep_pension"),
                       when((col("tiene_cred_hipo_1") == 'X'), 1) \
                               .otherwise(0).alias("have_cred_hipo_1"),
                       when((col("tiene_cred_hipo_2") == 'X'), 1) \
                               .otherwise(0).alias("have_cred_hipo_2"),
                       when((col("genero") == "M"), 1) \
                               .otherwise(0).alias("genre"),
                       when((col("ocupacion") == "\\N") | (col("ocupacion") == "Sin Ocupacion Asignada") 
                            | (col("ocupacion") == "Vacío"), "otro") \
                               .otherwise(lower(col("ocupacion"))).alias("occupation"),
                       when((col("rechazo_credito") == "\\N"), 0) \
                               .otherwise(1).alias("credit_rejected"),
                       when((col("cat_ingreso") == "\\N"), "NINGUNO") \
                               .otherwise(col("cat_ingreso")).alias("income_cat"),
                       
                          ).drop("estado_civil","nivel_academico","tipo_vivienda",
                                 "ind_mora_vigente", "cartera_castigada", "tenencia_tc",
                                 "tiene_consumo", "tiene_crediagil", "tiene_ctas_activas",
                                 "tiene_ctas_embargadas", "pension_fopep", "tiene_cred_hipo_1",
                                 "tiene_cred_hipo_2", "genero", "ocupacion", "rechazo_credito",
                                 "cat_ingreso",
                                )

Por último, se realizará una limpieza de nulos en la varibale categorica ordenal de categoria.

In [None]:
clean_df = clean_df.filter(col("categoria").isNotNull())

## Limpieza de variables númericas

In [None]:
num_col = [i for i,j in clean_df.dtypes if j != "string" and j != "int"]
print("Variables numericas a limpiar")
num_col

Primero, la variable edad se trabajara categorizada. Se realizaran las categorias utilizando el común de los bancos "las generaciones".

Generación Z -> 11 - 27

Generación Y -> 28 - 40

Generación X -> 41 - 52

Baby Boom [BB]-> mayor que 52

para realizar la categorización se utilizara una udf, pero antes, se realizara limpieza de valores nulos.

In [None]:
clean_df = clean_df.filter(col("edad").isNotNull())

In [None]:
def age_categorization(x):
    if x <= 27: return "Z"
    elif 27 < x <= 40: return "Y"
    elif 40 < x <= 52: return "X"
    else: return "BB"

udf_age_categorization = udf(age_categorization, StringType())

Las variables de las moras cada 30, 60 y 90 dias, serán categorizadas ya que asi aportaran más información que de manera númerica. Si ha tenido almenos una mora, se marcara con 1 y si no, con 0

El siguiente bloque de codigo realiza todas las limpiezas

In [None]:
clean_df = clean_df.select('*',
                      udf_age_categorization(col("edad")).alias("age_cat"),
                      when((col("cant_moras_30_ult_12_meses") > 0), 1) \
                               .otherwise(0).alias("mora_30_12"),
                      when((col("cant_moras_60_ult_12_meses") > 0), 1) \
                               .otherwise(0).alias("mora_60_12"),
                      when((col("cant_moras_90_ult_12_meses") > 0), 1) \
                               .otherwise(0).alias("mora_90_12"),
                      when((col("cant_mora_30_tdc_ult_3m_sf") == 0.0) 
                           | (col("cant_mora_30_tdc_ult_3m_sf") == None), 0) \
                               .otherwise(1).alias("mora_30_3_tc"),
                      when((col("cant_mora_30_consum_ult_3m_sf") == 0.0) 
                           | (col("cant_mora_30_consum_ult_3m_sf") == None), 0) \
                               .otherwise(1).alias("mora_30_3_cons"),
                      ).drop("edad", "cant_moras_30_ult_12_meses", "cant_moras_60_ult_12_meses",
                             "cant_moras_90_ult_12_meses", "cant_mora_30_tdc_ult_3m_sf",
                             "cant_mora_30_consum_ult_3m_sf")

Variables que no aportan información a la variable objetivo de manera númerica, estas variables no se utilizaran ya que sesgarian en gran medida las predicciones al tener principalmente valores nulos que no corresponen como tal a datos sucios, si no, a clientes que no presentan cuentas activas o que no tienen cuentas embargadas. Ya hay un flag binarizado para trabajar con estas variables.

In [None]:
clean_df = clean_df.drop("nro_tot_cuentas", "ctas_embargadas", "ctas_activas", "ingreso_nompen",
                         "cupo_tc_mdo", "cant_cast_ult_12m_sr", "ingreso_nomina", "ingreso_segurida_social",
                         "pol_centr_ext")

Utilizando la definición del ind (ingreso neto disponible) que nos brinda bancolombia, se van a eliminar ciertas variables que indican relación directa con otras

Ingreso neto disponible calculado para el cliente
Ingreso final – Gasto familiar – Cuotas pagadas + Cuotas pagadas de la línea de crédito Libranza.

Cuotas Pagadas = Cuota de vivienda (CUOTA DE VIVIENDA) + cuota de consumo (CUOTA DE CONSUMO) + cuota rotativos (CUOTA ROTATIVOS) + cuota comercial (CUOTA COMERCIAL) + cuota de microcrédito (CUOTA DE MICROCRÉDITO) + cuota de TDC (CUOTA TARJETA DE CREDITO) + cuota de sector solidario (CUOTA DE SECTOR SOLIDARIO) + cuota sector real comercio (CUOTA SECTOR REAL COMERCIO).

In [None]:
clean_df = clean_df.withColumn("fees_paid", col("cuota_de_vivienda") + col("cuota_de_consumo") 
                               + col("cuota_rotativos") + col("cuota_tarjeta_de_credito")
                               + col("cuota_de_sector_solidario") + col("cuota_sector_real_comercio")
                               + col("cuota_tc_mdo")) \
                   .drop("cuota_de_vivienda", "cuota_de_consumo", "cuota_rotativos",
                         "cuota_tarjeta_de_credito", "cuota_de_sector_solidario",
                         "cuota_sector_real_comercio", "cuota_tc_mdo")

In [None]:
features = len(clean_df.dtypes)
print(f"Total de caracteristicas en el dataset: {features}")
print(f"Total de registros en el dataset: {clean_df.count()}")

In [None]:
clean_df.dtypes

## Se guarda el nuevo dataset generado

In [None]:
clean_df.select("rep_calif_cred").groupBy("rep_calif_cred").count().collect()

In [None]:
sample = clean_df.sampleBy("rep_calif_cred",
                           fractions = {"A": 0.12,
                                        "B": 0.12,
                                        "C": 0.12,
                                        "D": 0.12,
                                        "E": 0.12,
                                        "F": 0.12,
                                        "G": 0.12,
                                        "H": 0.12},
                           seed = 13452)

In [None]:
df.repartition(5).write.parquet('tmp/clean_data')

In [None]:
#sample.select("categoria").count()

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/romuz/monografia/bigData/lib/python3.8/site-packages/py4j/clientserver.py", line 480, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/romuz/monografia/bigData/lib/python3.8/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/romuz/monografia/bigData/lib/python3.8/site-packages/py4j/clientserver.py", line 503, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
