In [0]:
import pandas as pd
import numpy as np
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, LongType, BooleanType, DoubleType, TimestampType, DateType
from pyspark.sql.functions import desc, length, col,isnan,when,count
import pyspark.sql.functions as F
from datetime import datetime, timedelta
import handyspark as hpys
import re
from unicodedata import normalize

In [0]:
# # Montar contenedores del Storage Account que contienen los datos de origen
# storage_account = "blobName"
# container = "data"
# blobKey = "blobKey"
# blobEndpoint = "wasbs://{1}@{0}.blob.core.windows.net/".format(storage_account, container)
 
# try:
#   dbutils.fs.mount(
#     source = blobEndpoint,
#     mount_point = "/mnt/data",
#     extra_configs = {"fs.azure.account.key.{0}.blob.core.windows.net".format(storage_account):blobKey})
# except:
#   print("Already mounted. Unmounting and trying to mount again")
#   dbutils.fs.unmount(mount_point='/mnt/data')
#   dbutils.fs.mount(
#     source = blobEndpoint,
#     mount_point = "/mnt/data",
#     extra_configs = {"fs.azure.account.key.{0}.blob.core.windows.net".format(storage_account):blobKey})

In [0]:
# storage_account = "blobName"
# container = "basecruzada"
# blobKey = "blobKey"
# blobEndpoint = "wasbs://{1}@{0}.blob.core.windows.net/".format(storage_account, container)

# try:
#   dbutils.fs.mount(
#     source = blobEndpoint,
#     mount_point = "/mnt/basecruzada",
#     extra_configs = {"fs.azure.account.key.{0}.blob.core.windows.net".format(storage_account):blobKey})
# except:
#   print("Already mounted. Unmounting and trying to mount again")
#   dbutils.fs.unmount(mount_point='/mnt/basecruzada')
#   dbutils.fs.mount(
#     source = blobEndpoint,
#     mount_point = "/mnt/basecruzada",
#     extra_configs = {"fs.azure.account.key.{0}.blob.core.windows.net".format(storage_account):blobKey})

In [0]:
# Leer archivo parquet
file = "/mnt/basecruzada/basecruzada.parquet"
DF_DENUNCIAS = spark.read.parquet(file)

In [0]:
# Ver dimensiones y columnas que trae la tabla importada
print((DF_DENUNCIAS.count(), len(DF_DENUNCIAS.columns)))
print(DF_DENUNCIAS.columns)

# Iniciar limpieza de base completa

## 1. Verificar campo de ID denominado "NumeroPeticion"

In [0]:
# Verificamos la cantidad de ID nulos
num_ID_nulos = DF_DENUNCIAS.filter(col("NumeroPeticion").isNull())
print("ID NULOS: ", num_ID_nulos.count())

#Verificamos cuántos registros tienen ID duplicados y cuál es el número de ID duplicados
ID_duplicados = DF_DENUNCIAS.groupby("NumeroPeticion").count().filter(col("count")>1).cache()
print("Número de registros con ID duplicados: ", ID_duplicados.agg({'count': 'sum'}).collect()[0][0], 
      "Numero de ID que están duplicados: ", ID_duplicados.count())

# Mostramos los primeros 10 ID más repetidos
print(ID_duplicados.sort("count", ascending=False).show(10))

Para revisar estos duplicados, verificamos si corresponde a registros diferentes (situaciones diferentes, con un mismo ID por errores de la base de datos) o registros duplicados exactos (la información de todos los campos es exactamente el mismo). En el primer caso, se deben eliminar todos los duplicados, ya que no sabemos cuál es la información correcta. En el segundo caso, se debe mantener solo uno de los registros duplicados.

Luego de hacer una verificación uno a uno de los 190 registros duplicados, se encuentra que todos corresponden a NNA que han tenido un PARD, que tuvieron también un reingreso y cuya información no fue registrada correctamente en el sistema de información. Debido a esto, se decide dejar únicamente el registro más reciente, pues es el que tiene la información más actualizada de la medida tomada con el NNA. Para saber cuál es el registro más reciente en cada duplicado, se usarán los campos "ANIO" y "MES" que trae la base de PARD ordenándolos de mayor a menor, y la función dropDuplicates() de Spark dejará solo la primera observación, es decir, la más reciente.

In [0]:
print("Número de registros antes de filtro de ID duplicados: ", DF_DENUNCIAS.count())
DF_DENUNCIAS = DF_DENUNCIAS.sort("NumeroPeticion", "ANIO", "MES").dropDuplicates(subset=["NumeroPeticion"])
print("Número de registros después de filtro de ID duplicados: ", DF_DENUNCIAS.count())

## 2. Casteo de variables

In [0]:
# Se crean listas de cómo debe ser casteada cada variable
columnas_longInt = ["NumeroPeticion"]
columnas_int = ['AnoRegistroPeticion',
                'CodDptoPeticionario', 'CodMunicipioPeticionario', 'EdadPeticionario',
                'CodDptoAfectado', 'CodMunicipioAfectado', 'EdadAfectado',
                'ANIO', 'MES']

columnas_float = []

columnas_date = ["fechaRegistroPeticion", "FechaNacAfectado", "PRD_160_FECHA_APERTURA", "PRD_760_CIERRE_PROCESO_RESTABLECIMIENTO",
                 "FECHA_ACTUACION_MAS_RECIENTE", "PRD_165", 'fecha_ingreso', 'FechaAEG_013', 'FechaPRD_500', 'FechaPRD_510', 
                 'FechaPRD_525', 'FechaPRD_045', 'FechaPRD_825', 'PrimeraFechaPRD_160', 'FechaPRD_165', 'FechaPRD_845', 'PRD_845_AD']

# Se define como columnas string todas aquellas que no están en ninguna de las anteriores
columnas_string = [x for x in DF_DENUNCIAS.columns if x not in columnas_int + columnas_float + columnas_date]

In [0]:
# Hay algunas columnas de fecha que traen las fechas en formato decimal de Excel (ej: 44256,8303819444 que representa 1/03/2021  19:55:45). Para ello se debe crear esta funcion convert_date_udf
def convert_date(x):
    mDt = datetime(1899, 12, 30) # Se genera fecha inicial que representa el valor 0 en Excel
    x = str(x)
    if x=="None":
      x = "1900-01-01 00:00:00"
    try:
      x = float(x)
      is_float = True
    except ValueError:
      is_float = False
    if is_float:
      dlt = mDt + timedelta(days=x)
      return dlt.strftime("%Y-%m-%d %H:%M:%S")
    else:
      return x
  
convert_date_udf = udf(lambda z: convert_date(z), StringType())

In [0]:
# Aquí vamos a castear todas las columnas de acuerdo con su tipo
for i in columnas_string:
  DF_DENUNCIAS = DF_DENUNCIAS.withColumn(i, DF_DENUNCIAS[i].cast(StringType()))
for i in columnas_longInt:
  DF_DENUNCIAS = DF_DENUNCIAS.withColumn(i, F.regexp_replace(i, ',', '.').cast(LongType()))
for i in columnas_int:
  DF_DENUNCIAS = DF_DENUNCIAS.withColumn(i, F.regexp_replace(i, ',', '.').cast(IntegerType()))
for i in columnas_float:
  DF_DENUNCIAS = DF_DENUNCIAS.withColumn(i, F.regexp_replace(i, ',', '.').cast(DoubleType()))
for i in columnas_date:
  # Aquí empleamos la función definida en la celda anterior para corregir fechas en formato numérico
  DF_DENUNCIAS = DF_DENUNCIAS.withColumn(i, convert_date_udf(i))
  # Ante posibles errores por variables de fecha, deben incluir el patrón de la fecha acá
  DF_DENUNCIAS = DF_DENUNCIAS.withColumn(i, \
                 when(DF_DENUNCIAS[i].like("_/_/__ __:__"), \
                      F.unix_timestamp(DF_DENUNCIAS[i], "d/M/yy H:mm").cast(TimestampType())). \
                 when(DF_DENUNCIAS[i].like("____-__-__ __:__:__"), \
                      F.unix_timestamp(DF_DENUNCIAS[i], "yyyy-MM-dd HH:mm:ss").cast(TimestampType())). \
                 when(DF_DENUNCIAS[i].like("____-__-__ __:__:__.___"), \
                      F.unix_timestamp(DF_DENUNCIAS[i], "yyyy-MM-dd HH:mm:ss.SSS").cast(TimestampType())). \
                 when(DF_DENUNCIAS[i].like("__/__/____ _:__"), \
                      F.unix_timestamp(DF_DENUNCIAS[i], "dd/MM/yyyy H:mm").cast(TimestampType())). \
                 otherwise( \
                      F.unix_timestamp(DF_DENUNCIAS[i], "dd/MM/yyyy HH:mm").cast(TimestampType())))
  # Fijar que cuando las fechas son del tipo "datetime.datetime(1900, 1, 1, 0, 0) - 1900-01-01 00:00:00" se debe poner como nulo
  DF_DENUNCIAS = DF_DENUNCIAS.withColumn(i, \
                 when(DF_DENUNCIAS[i]==datetime(1900, 1, 1, 0, 0), \
                      None). \
                 otherwise(DF_DENUNCIAS[i]))

In [0]:
# Como pueden ser variables importantes en el modelo, se extrae la hora y minuto de registro de la petición
DF_DENUNCIAS = DF_DENUNCIAS.withColumn("horaRegistroPeticion", F.hour(col("fechaRegistroPeticion")))
DF_DENUNCIAS = DF_DENUNCIAS.withColumn("minutoRegistroPeticion", F.minute(col("fechaRegistroPeticion")))

# Se convierte la variables de TimeStamp a Date. Así se eliminan las referencias a horas, minutos y segundos que tengan las variables y se ponen en un formato estándar para Spark
for i in columnas_date:
  DF_DENUNCIAS = DF_DENUNCIAS.withColumn(i, F.to_date(DF_DENUNCIAS[i]))

## 3. Limpieza de valores vacíos general para todas las variables

In [0]:
# Reemplazar todos los posibles valores nulos
def limpiar_nulos(df, debug=False):
    for contador, i in enumerate(list(df.columns)):
        if debug:
            print(contador, i)
        df = df.withColumn(i, when((col(i) == 'None') | \
            (col(i) == 'NULL') | \
            (col(i) == 'null') | \
            (col(i) == 'NaN') | \
            (F.trim(col(i)) == '') | \
            (col(i) == ' ' ) | \
            (col(i) == '#N/A' ) | \
            (col(i) == 'Sin Información' ) | \
            (col(i) == 'Sin Dato' ) | \
            (col(i) == '#¡REF!' ) | \
            (col(i) == 'VALOR NULO' ) | \
            (col(i) == 'SELECCIONE' ),
            None).otherwise(df[i]))
    return df
  
DF_DENUNCIAS_limpiovacios = limpiar_nulos(DF_DENUNCIAS, debug=False)

# 4. Creación de variable objetivo

A continuación se creara la variable objetivo del modelo con base a los campos "TipoPeticion", "CONSTATACION", "CategorizacionPRD_165" y "MEDIDA_TOMADA_ULTIMA". Las categorías de la variable objetivo serán: falsa, sindefinir_fallida, verdadera_nopard, verdadera_pard_noinst, verdadera_pard_inst

In [0]:
# Frecuencias conjuntas "TipoPeticion" , "CategorizacionPRD_165", "CONSTATACION"
DF_DENUNCIAS_limpiovacios.groupby("TipoPeticion", "CONSTATACION", "CategorizacionPRD_165").count().sort(["TipoPeticion", "CONSTATACION", "CategorizacionPRD_165"]).show(200, truncate=False)

In [0]:
# Se eliminan todos los registros que corresponden a Inobservancia de Derechos, es decir todos aquellos que en el campo de "TipoPetición" son "Inobservancia de derechos" o en el campo "CategorizacionPRD_165" son "ACTIVACION DE RUTA SNBF"
antes = DF_DENUNCIAS_limpiovacios.count()
DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.filter(col("TipoPeticion")!="Inobservancia de derechos")
DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.filter((col("CategorizacionPRD_165").isNull()) | (col("CategorizacionPRD_165")!="ACTIVACION DE RUTA SNBF"))
despues = DF_DENUNCIAS_limpiovacios.count()
print("# de registros iniciales: ", antes, "# de registros eliminados: ", antes - despues, "# de registros que permanecen: ", despues)

In [0]:
# Se encuentra que hay discrepancia entre los campos "CategorizacionPRD_165" y "CONSTATACION". Por ejemplo, en unos pocos casos en CONSTATACION aparece Fallida o Falsa, sin embargo en PRD_165 aparece como si se hubiera hecho verificación "ATENCION POR MEDIO DE ORIENTACION Y ASISTENCIA A LA FAMILIA". Por esta razón, en la construcción de la variable objetivo se dará prioridad al campo "CategorizacionPRD_165"

# Se crean lsitados de categorías que servirán para la clasificación en la próxima celda
PARD_INST = ["Adopción", "Adopción ", "Atenc especializ internado", "Ub At Especializada Casa Hogar ", "Ub M Fliar Hogar de paso", "Ub M Fliar Hogar Sustituto", "Ubicación en Centro de Emergencia", "Ub M fliar – Solidaridad Familiar", "Ub M Fliar Hogar Amigo"]
PARD_NOINST = ["Acciones policivas", "Acciones policivas, administrativas o judiciales ", "Amonestación con asistencia obligatoria a curso pedagógico. ", "Atenc especiali Intervenciòn de Apoyo", "Atenc especializ Externado", "Atenc especializ semiinternado", "Cualquiera otra medida que garantice la protección integral de los niños, niñas y adolescentes", "Por Definir", "Ub M fliar de origen o familia extensa", "Ub M Fliar Hogar Gestor"]

prd165_srd_pardinst = ["FALLECIMIENTO DEL NIÑO, NIÑA O ADOLESCENTE"]
prd165_srd_verdaderanopard = ["ATENCION POR MEDIO DE ORIENTACION Y ASISTENCIA A LA FAMILIA", "EVASION DEL NIÑO, NIÑA O ADOLESCENTE"]
prd165_srd_sindefinir = ["DATOS DE UBICACION ERRADOS", "CONFIRMACIÓN MAYORÍA DE EDAD", "PETICIONARIO NO APORTA DATOS DE UBICACION O DE CONTACTO"]
prd165_srd_falsa = ["DERECHOS GARANTIZADOS"]

constatacion_verdadera = ["Verdadera"]
constatacion_fallida = ["Dirección Errada", "Fallida", "Sin Resultado de Constatación"]
constatacion_falsa = ["Falsa"]

In [0]:
# Se crean las distintas categorías de la variable objetivo

DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.withColumn("VAR_OBJETIVO", \
              # Inicialmente definimos la petición verdadera PARD con medida institucional como aquella que cruzó con la base de PARD y en el campo MEDIDA_TOMADA_ULTIMA tiene registrada alguna de las categorías de la lista PARD_INST
              when((col("MEDIDA_TOMADA_ULTIMA").isin(PARD_INST)), \
                     "verdadera_pard_inst"). \
              # También tomaremos como PARD con medida institucional aquellos que en el campo CategorizacionPRD_165 se registran como "FALLECIMIENTO DEL NIÑO, NIÑA O ADOLESCENTE" debido a la gravedad de esta situación que amerita una priorización
              when((col("CategorizacionPRD_165").isin(prd165_srd_pardinst)), \
                     "verdadera_pard_inst"). \
              # Ahora los PARD con medida no institucional son aquellos que están en la lista PARD_NOINST
              when((col("MEDIDA_TOMADA_ULTIMA").isin(PARD_NOINST)), \
                     "verdadera_pard_noinst"). \
              # Ahora las peticiones verdaderas. Tomamos en primera medida todas aquellas (sin importar TipoPeticion) que en el campo CategorizacionPRD_165 estén en las siguientes dos categorías
              when((col("CategorizacionPRD_165").isin(prd165_srd_verdaderanopard)), \
                     "verdadera_nopard"). \
              # También se toman como verdaderas no PARD todas aquellas SRD que no tienen cruce con PARD y CategorizacionPRD_165 es un valor nulo
              when((col("TipoPeticion") == "Solicitud de Restablecimiento de Derechos (SRD)") & \
                   (col("MEDIDA_TOMADA_ULTIMA").isNull()) & \
                   (col("CategorizacionPRD_165").isNull()), \
                     "verdadera_nopard"). \
              # También se toman verdaderas no PARD aquellas RAVD que no cruzan con PARD y tienen resultado de CONSTATACION Verdadera
              when((col("TipoPeticion") == "Reporte de Amenaza o Vulneración de derechos") & \
                   (col("CONSTATACION").isin(constatacion_verdadera)) &\
                   (col("MEDIDA_TOMADA_ULTIMA").isNull()), \
                     "verdadera_nopard"). \
              # Ahora se categorizan las sindefinir_fallida. Estas en primera medida son las que en el campo CategorizacionPRD_165 tiene alguno de los valores de la lista prd165_srd_sindefinir
              when((col("CategorizacionPRD_165").isin(prd165_srd_sindefinir)), \
                     "sindefinir_fallida"). \
              # También aquellas RAVD que tienen en CONSTATACION alguna de las categorías de constatacion_fallida
              when((col("TipoPeticion") == "Reporte de Amenaza o Vulneración de derechos") & \
                   (col("CONSTATACION").isin(constatacion_fallida)) &\
                   (col("MEDIDA_TOMADA_ULTIMA").isNull()), \
                     "sindefinir_fallida"). \
              # Finalmente, las falsas son aquellas que en CategorizacionPRD_165 registra "DERECHOS GARANTIZADOS" o que son RAVD que en el campo CONSTATACION son "Falsa"
              when((col("CategorizacionPRD_165") == "DERECHOS GARANTIZADOS"), \
                     "falsa"). \
              when((col("TipoPeticion") == "Reporte de Amenaza o Vulneración de derechos") & \
                   (col("CONSTATACION") == "Falsa"), \
                     "falsa"). \
            otherwise(None))

In [0]:
# Verificamos frecuencias
DF_DENUNCIAS_limpiovacios.groupby("VAR_OBJETIVO").count().sort("count", ascending=False).show(truncate=False)

In [0]:
# Verificamos que los valores nulos corresponden por Remisión a otras entidades
DF_DENUNCIAS_limpiovacios.filter(col("VAR_OBJETIVO").isNull()). \
groupby("VAR_OBJETIVO", "TipoPeticion", "CONSTATACION", "MEDIDA_TOMADA_ULTIMA").count().sort("count", ascending=False).show(truncate=False)

In [0]:
# Se eliminan todas aquellos registros que tienen VAR_OBJETIVO vacío. Estas corresponden en su totalidad a RAVD que fueron remitidas a otra entidad o de las que no se conoce su resultado de constatación
DF_DENUNCIAS_limpiovacios.cache()
antes = DF_DENUNCIAS_limpiovacios.count()
DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.filter(col("VAR_OBJETIVO").isNotNull())
despues = DF_DENUNCIAS_limpiovacios.count()
print("# registros eliminados: ", antes - despues, "# de registros que permanecen: ", despues)

# 5. Creación de variables de valoración PARD
A continuación se crearán tres variables que son de interés para los defensores de familia al momento de consultar el tablero: Si el NNA ha sido atendido o no por un equipo de defensoría de familia, la fecha de esta primera atención y si la petición ha tenido o no un trámite de fondo.

In [0]:
# Las siguientes columnas son las que se van a verificar para saber si el afectado en la petición ha sido atendido por un equipo de defensoría
cols_atendido = ['fecha_ingreso', 'FechaAEG_013', 'FechaPRD_500', 'FechaPRD_510', 'FechaPRD_525', 'FechaPRD_045', 'FechaPRD_825', 'PrimeraFechaPRD_160', 'FechaPRD_165', 'FechaPRD_845', 'PRD_845_AD']

# Se crea columna con primera fecha de atención. Aquellas que no tengan ninguna atención tendrán este campo vacío (null)
DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.withColumn("Fecha_Primera_Atencion", F.least(*cols_atendido))

# Se crea columna llamada "Atendido", en la que si la fecha de primera atención es vacío toma el valor de "NO", pero si tiene algún registro tiene el valor de "SI"
DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.withColumn("Atendido",
                                                                 when(col("Fecha_Primera_Atencion").isNull(), "NO").
                                                                 otherwise("SI"))

# Ahora se crea la columna "TramiteDeFondo" para saber si la petición ha tenido un trámite de fondo (en cuyo caso la columna tomará los valores de "PARD", "NoAmeritaPARD" o "RemitidoOtraEntidad") o si no ha tenido trámite de fondo (en este caso tomará el valor de "SinTramite")
DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.withColumn("TramiteDeFondo",
                                                                 when(col("PrimeraFechaPRD_160").isNotNull(), "PARD").
                                                                 when(col("FechaPRD_165").isNotNull(), "NoAmeritaPARD").
                                                                 when(col("FechaPRD_845").isNotNull(), "RemitidoOtraEntidad").
                                                                 otherwise("SinTramite"))

# 6. Creación de variable de PaisAfectado
Se revisará la variable de PaisResidenciaAfectado para verificar su contenido. Además, para el propósito del modelamiento y la visualización se crearán dos variables: Una con los cinco países más frecuentes y el restante agrupado en "Otros" y otra con los diez países más frecuentes y el restante agrupado en "Otros"

In [0]:
# Reemplazar los países que tienen valor vacío (null) por ND
DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.na.fill("ND", subset="PaisResidenciaAfectado")

# Crear tabla con los 10 países más frecuentes de residencia del afectado, ordenada por frecuencia de mayor a menor
freq_paises = DF_DENUNCIAS_limpiovacios.groupby("PaisResidenciaAfectado").count().sort("count", ascending=False).limit(10)
print(freq_paises.show())

# Convertir el anterior dataframe a una lista que contenga a los 10 primeros países
list_paises_10 = [row['PaisResidenciaAfectado'] for row in freq_paises.collect()]

# Convertir el anterior dataframe a una lista que contenga a los 5 primeros países
list_paises_5 = list_paises_10[:5]

# Crear primera columna en la que se agrupan los países del puesto 11 hacia abajo en "Otros"
DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.withColumn("PaisAfectado_10",
                                                                 when(col("PaisResidenciaAfectado").isin(list_paises_10), col("PaisResidenciaAfectado")).
                                                                 otherwise("OTRO"))

# Crear primera columna en la que se agrupan los países del puesto 6 hacia abajo en "Otros"
DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.withColumn("PaisAfectado_5",
                                                                 when(col("PaisResidenciaAfectado").isin(list_paises_5), col("PaisResidenciaAfectado")).
                                                                 otherwise("OTRO"))

# 7. Se agregan variables numéricas adicionales (coordenadas de centro zonal direccionado, variables departamentales)

### 7.1. Obtener coordenadas de centros zonales

In [0]:
#Información sobre conexión a la base de datos.
jdbcHostname = dbutils.secrets.get('blobsecret','SQLHostname')
jdbcDatabase = dbutils.secrets.get('blobsecret','SQLDatabase')
jdbcPort = dbutils.secrets.get('blobsecret','SQLport')
jdbcuser = dbutils.secrets.get('blobsecret','SQLusername')
jdbcpass = dbutils.secrets.get('blobsecret','SQLpassword')
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname, jdbcPort, jdbcDatabase)

# Se lee archivo con coordenadas de centros zonales
coord_cz = spark.read.format("com.microsoft.sqlserver.jdbc.spark").option("url", jdbcUrl).option("dbtable", "[dbo].[centro_zonal_2020]").option("user", jdbcuser).option("password", jdbcpass).load()

# Seleccionamos columnas necesarias
coord_cz = coord_cz.select("CodigoMuni", "COD_CZ", "REGIONAL", "NOMBRE_CEN", "X", "Y")

# Se crea una columna con el codigo DIVIPOLA del departamento y eliminamos la de municipio que no será necesaria
coord_cz = coord_cz.withColumn("CodDptoRegionalDireccionada", F.substring('CodigoMuni', 1,2)).drop("CodigoMuni")

# Se debe arreglar un registro que no tiene 
coord_cz = coord_cz.withColumn("CodDptoRegionalDireccionada", when(col("REGIONAL")=="LA GUAJIRA", "44").otherwise(col("CodDptoRegionalDireccionada")))

# Castear columnas diferentes a texto
for i in ["X", "Y"]:
  coord_cz = coord_cz.withColumn(i, F.regexp_replace(i, ',', '.').cast(DoubleType()))

In [0]:
# Para recuperar las coordenadas de los centros zonales, utilizaremos dos llaves: La regional y el centro zonal. Por lo tanto empezaremos al comparar los campos de regional entre la base de peticiones y el listado de centros zonales que contiene las coordenadas
  
# Valores únicos de regionales a las que se direccionan las peticiones
listaRegional_cz_direccionado = DF_DENUNCIAS_limpiovacios.select("RegionalDireccionada").distinct().sort("RegionalDireccionada", ascending=True).rdd.flatMap(lambda x: x).collect()

# Valores únicos de regionales en el archivo de centros zonales
listaRegional_cz = coord_cz.select("REGIONAL").distinct().sort("REGIONAL", ascending=True).rdd.flatMap(lambda x: x).collect()

# Mostrar cuáles regionales están en base de peticiones pero no en el archivo de centros zonales
print("Regionales que están en la base de peticiones pero no en archivo de centros zonales \n", 
      [x for x in listaRegional_cz_direccionado if x not in listaRegional_cz])

In [0]:
# De la celda anterior se ve que es necesario reemplazar en el archivo de centros zonales 'NARIÃ‘O' por 'NARIÑO'. Para ello, se realiza un diccionario y se hace el reemplazo en la variable "REGIONAL"
coord_cz = coord_cz.na.replace({"NARIÃ‘O": "NARIÑO"}, subset="REGIONAL")

# Luego de este cambio, verificar de nuevo cuáles regionales están en base de peticiones pero no en el archivo de centros zonales
listaRegional_cz = coord_cz.select("REGIONAL").distinct().sort("REGIONAL", ascending=True).rdd.flatMap(lambda x: x).collect()
print([x for x in listaRegional_cz_direccionado if x not in listaRegional_cz])

In [0]:
# Para los centros zonales que tienen como regional INSTITUCIONES AUTORIZADAS ADOPCION' o 'SEDE NACIONAL' será necesario agregar las coordenadas manualmente. Verifiquemos si hay otros registros que tampoco cruzarán y por lo tanto será necesario encontrar las coordenadas de igual forma

# Creamos una función que nos imprimirá el listado de regionales y cz de la base de peticiones que no cruza con las coordenadas

# Primero, creamos una función para limpiar las columnas y facilitar el cruce 
@udf
def limpiar_col(text):
  text = text.lower().replace('.', '').replace('"', '').replace('ã‘', 'ñ').replace(' ', '')
  text = normalize('NFKD', text).encode("ascii","ignore").decode("ascii")
  return text
  
def check_cruce_cz():
  # Seleccionamos columnas necesarias
  check_crucecz_peticiones = DF_DENUNCIAS_limpiovacios.select("RegionalDireccionada", "CZDireccionado").distinct()
  check_crucecz_listado = coord_cz.select("REGIONAL", "NOMBRE_CEN").distinct()

  # Se aplica función a las columnas de regional y centro zonal
  check_crucecz_peticiones = check_crucecz_peticiones.withColumn("RegionalDireccionada2", limpiar_col("RegionalDireccionada"))
  check_crucecz_peticiones = check_crucecz_peticiones.withColumn("CZDireccionado2", limpiar_col("CZDireccionado"))
  check_crucecz_listado = check_crucecz_listado.withColumn("REGIONAL2", limpiar_col("REGIONAL"))
  check_crucecz_listado = check_crucecz_listado.withColumn("NOMBRE_CEN2", limpiar_col("NOMBRE_CEN"))

  # Se realiza el cruce preliminar para conocer cuáles cz no tienen coordenadas. Se hace un filtro de forma que solo queden los que no cruzaron
  cond = [check_crucecz_peticiones["RegionalDireccionada2"] == check_crucecz_listado["REGIONAL2"], 
          check_crucecz_peticiones["CZDireccionado2"] == check_crucecz_listado["NOMBRE_CEN2"]]
  check_crucecz_peticiones = check_crucecz_peticiones.join(check_crucecz_listado, cond, how='left').filter(col("REGIONAL").isNull())

  # Mostrar los registros que no cruzaron
  nocruzaron = check_crucecz_peticiones.select("RegionalDireccionada", "CZDireccionado").sort(["RegionalDireccionada", "CZDireccionado"])
  print("Número de centros zonales en base de peticiones que no cruzaron :", nocruzaron.count())
  print(nocruzaron.show(100, truncate=False))

# Aplicamos la función
check_cruce_cz()

In [0]:
# Se encuentra que la mayoría de regionales y centros zonales que no cruzan son aquellas que en la base de peticiones se direccionan a las direcciones regionales y a las instituciones autorizadas para la adopción. Los restantes son centros zonales que tienen un nombre diferente entre la base de peticiones y la de centros zonales. Estos ajustes será necesario realizarlos manualmente

# Ajustemos primero los centros zonales que tienen un nombre diferente entre las dos bases. Para ello creamos un diccionario que hará los cambios en la base de peticiones
dicc_czcambios = {"CZ EL CARMEN DE BOLIVAR": "CZ CARMEN DE BOLIVAR", "CZ AGUSTIN CODAZZI": "CZ CODAZZI", "CZ 1 MONTERIA": "CZ MONTERIA 1", "CZ INTEGRAL NOROCCIDENTAL": "CZ NOROCCIDENTAL", "CZ INTEGRAL NORORIENTAL": "CZ NORORIENTAL", "CAIF COMUNA 13": "CZ ROSALES", "CZ SAN CRISTOBAL SUR": "CZ SAN CRISTOBAL", "CZ SAN JOSE DE GUAVIARE": "CZ SAN JOSE DEL GUAVIARE", "CZ SANTA MARTA 2": "CZ SANTA MARTA NORTE", "CZ SANTA MARTA 1": "CZ SANTA MARTA SUR", "CAIF COMUNA 8": "CZ SUR ORIENTE", "CZ YARIQUIES": "CZ YARIGUIES"}

DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.na.replace(dicc_czcambios, subset="CZDireccionado")

# También eliminamos de la columna de centros zonales unos registros que traen comillas "
@udf
def limpiar_col2(text):
  text = text.replace('"', '')
  return text

DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.withColumn("CZDireccionado", limpiar_col2("CZDireccionado"))

In [0]:
# Se agregan 44 registros con las coordenadas de direcciones regionales e instituciones autorizadas para la adopción
data_cz = [("0000","AMAZONAS","REGIONAL AMAZONAS",-4.2172145,-69.9351763,"91"), ("0000","BOGOTA","REGIONAL BOGOTA",4.6414275,-74.094736,"11"), ("0000","CUNDINAMARCA","REGIONAL CUNDINAMARCA",4.6800036,-74.0625967,"25"), ("0000","CHOCO","REGIONAL CHOCO",5.690288,-76.6606877,"27"), ("0000","ANTIOQUIA","REGIONAL ANTIOQUIA",6.2513882,-75.600688,"05"), ("0000","ARAUCA","REGIONAL ARAUCA",7.0761773,-70.7126457,"81"), ("0000","ATLANTICO","REGIONAL ATLANTICO",10.9903413,-74.7987913,"08"), ("0000","BOLIVAR","REGIONAL BOLIVAR",10.4230524,-75.5471385,"13"), ("0000","BOYACA","REGIONAL BOYACA",5.5762939,-73.3408689,"15"), ("0000","CALDAS","REGIONAL CALDAS",5.0658144,-75.5083546,"17"), ("0000","CAQUETA","REGIONAL CAQUETA",1.6174462,-75.6071531,"18"), ("0000","CASANARE","REGIONAL CASANARE",5.3370571,-72.4087433,"85"), ("0000","CAUCA","REGIONAL CAUCA",2.4468445,-76.6234119,"19"), ("0000","CESAR","REGIONAL CESAR",10.4727743,-73.2500034,"20"), ("0000","CORDOBA","REGIONAL CORDOBA",8.7385536,-75.9056023,"23"), ("0000","GUAINIA","REGIONAL GUAINIA",3.8645172,-67.9270858,"94"), ("0000","GUAVIARE","REGIONAL GUAVIARE",2.5691093,-72.6430071,"95"), ("0000","HUILA","REGIONAL HUILA",2.9328805,-75.3002634,"41"), ("0000","LA GUAJIRA","REGIONAL LA GUAJIRA",11.5363201,-72.9195399,"44"), ("0000","MAGDALENA","REGIONAL MAGDALENA",11.2369312,-74.2063184,"47"), ("0000","META","REGIONAL META",4.1150854,-73.6106049,"50"), ("0000","NARIÑO","REGIONAL NARIÑO",1.2036969,-77.2654141,"52"), ("0000","NORTE DE SANTANDER","REGIONAL NORTE DE SANTANDER",7.9047431,-72.4877553,"54"), ("0000","PUTUMAYO","REGIONAL PUTUMAYO",1.1526395,-76.6512828,"86"), ("0000","QUINDIO","REGIONAL QUINDIO",4.5439624,-75.6703903,"63"), ("0000","RISARALDA","REGIONAL RISARALDA",4.8148492,-75.7101355,"66"), ("0000","SAN ANDRES","REGIONAL SAN ANDRES",12.5617118,-81.7241771,"88"), ("0000","SANTANDER","REGIONAL SANTANDER",7.1437337,-73.1357805,"68"), ("0000","SUCRE","REGIONAL SUCRE",9.2931345,-75.4055886,"70"), ("0000","TOLIMA","REGIONAL TOLIMA",4.4349783,-75.2137268,"73"), ("0000","VALLE DEL CAUCA","REGIONAL VALLE",3.4685525,-76.5190228,"76"), ("0000","VAUPES","REGIONAL VAUPES",1.2595745,-70.2360806,"97"), ("0000","VICHADA","REGIONAL VICHADA",6.1826067,-67.4880416,"99"), ("0000","INSTITUCIONES AUTORIZADAS ADOPCION","ASOCIACION AMIGOS DEL NIÑO AYUDAME",4.6979682,-74.0772445,"11"), ("0000","INSTITUCIONES AUTORIZADAS ADOPCION","FUNDACION CRAN",4.7491542,-74.0751404,"11"), ("0000","INSTITUCIONES AUTORIZADAS ADOPCION","CASA DE LA MADRE Y EL NIÑO",4.6382348,-74.0803545,"11"), ("0000","INSTITUCIONES AUTORIZADAS ADOPCION","CENTRO DE ADOPCION CHIQUITINES",3.3357044,-76.529711,"76"), ("0000","INSTITUCIONES AUTORIZADAS ADOPCION","CORPORACION CASA DE MARIA Y EL NIÑO",6.1905439,-75.5659312,"05"), ("0000","INSTITUCIONES AUTORIZADAS ADOPCION","FANA",4.7535145,-74.0854299,"11"), ("0000","INSTITUCIONES AUTORIZADAS ADOPCION","FUNDACION CASITA DE NICOLAS",6.2619263,-75.563012,"05"), ("0000","INSTITUCIONES AUTORIZADAS ADOPCION","FUNDAQCION LOS PISINGOS",4.7336526,-74.0267606,"11"), ("0000","SEDE NACIONAL","CENTRO DE CONTACTO",4.6718971,-74.095443,"11"), ("0000","SEDE NACIONAL","SEDE NACIONAL",4.6718971,-74.095443,"11"), ("0000","SUCRE","UNIDAD MÓVIL 01 REGIONAL SUCRE",9.2931345,-75.4055886,"70")]

schema_cz = StructType([ \
    StructField("COD_CZ",StringType(),True), \
    StructField("REGIONAL",StringType(),True), \
    StructField("NOMBRE_CEN",StringType(),True), \
    StructField("X",DoubleType(),True), \
    StructField("Y",DoubleType(),True), \
    StructField("CodDptoRegionalDireccionada",StringType(),True), \
  ])
nuevos_cz = spark.createDataFrame(data=data_cz,schema=schema_cz)

# Unir estos datos al dataframe original de centros zonales
coord_cz = coord_cz.union(nuevos_cz)

In [0]:
# Volvemos a verificar que ahora si todos los centros zonales hayan cruzado
check_cruce_cz()

# Ahora que no hay ningun registro que no cruce, hacemos el cruce final y definitivo para recuperar las coordenadas y el código de centro zonal
# Creamos columnas con nombres limpios con los cuales se hará el cruce
DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.withColumn("RegionalDireccionada2", limpiar_col("RegionalDireccionada"))
DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.withColumn("CZDireccionado2", limpiar_col("CZDireccionado"))
coord_cz = coord_cz.withColumn("REGIONAL2", limpiar_col("REGIONAL"))
coord_cz = coord_cz.withColumn("NOMBRE_CEN2", limpiar_col("NOMBRE_CEN"))

# Se realiza el cruce preliminar para conocer cuáles cz no tienen coordenadas. Se hace un filtro de forma que solo queden los que no cruzaron
cond = [DF_DENUNCIAS_limpiovacios["RegionalDireccionada2"] == coord_cz["REGIONAL2"],
        DF_DENUNCIAS_limpiovacios["CZDireccionado2"] == coord_cz["NOMBRE_CEN2"]]

DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.join(coord_cz, cond, how='left')

# Eliminamos columnas que ya no son necesarias
DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.drop("REGIONAL", "REGIONAL2", "NOMBRE_CEN", "NOMBRE_CEN2", "RegionalDireccionada2", "CZDireccionado2")

In [0]:
# Finalmente guardamos la tabla que nos permite recuperar las coordenadas a partir de las variables "RegionalDireccionada" y "CZDireccionado", de 
listado_coordcz = DF_DENUNCIAS_limpiovacios.select("RegionalDireccionada", "CZDireccionado", "COD_CZ", "X", "Y", "CodDptoRegionalDireccionada").distinct()
listado_coordcz.write.mode("overwrite").parquet("/mnt/data/Data/recuperacion_coordenadasCZ")

### 7.2. Recuperación de variables descriptivas del nivel departamental
Aquí se empleará un archivo externo que trae variables socioeconómicas a nivel departamental, de forma que se puedan incluir en el modelamiento algunas características propias del departamento adonde es direccionada la petición. El archivo que se importará trae variables como Tasa de Violencia Interpersonal, Tasa de Violencia Intrafamiliar, Tasa de Fecundidad en mujeres entre 15 y 19 años, Porcentaje de población rural, Porcentaje de Población Étnica, Porcentaje de Población Indígena y Víctimas de delitos sexuales. Para esto importaremos el archivo que contiene esta información desde el Storage Account

In [0]:
# Leer archivo como pandas Dataframe
import os
os.chdir(r'/dbfs//mnt/data/Data/')
file = "Variables_contexto_dptal.xlsx"
vardpto_contexto = pd.read_excel(file, sheet_name='importar', engine="openpyxl", converters={'Divipola_dpto1':str,'Divipola_dpto2':str})

# Por la anormalidad de la situación y las cifras del año 2020, para dicho año se tomarán los mismos valores observados en 2019
vars_contexto = ['tasa_vip_total', 'tasa_vif_total', "TasaFecunidad_15a19", "porc_rural", "vict_delsexual_total"]
for i in vars_contexto:
  nom_var_2019, nom_var_2020 = [i + "_2019", i + "_2020"]
  vardpto_contexto[nom_var_2020] = vardpto_contexto[nom_var_2019]

# Convertir el dataframe de formato wide a long
vardpto_contexto = pd.wide_to_long(vardpto_contexto, vars_contexto, i=['Divipola_dpto1', 'Divipola_dpto2', "porc_etnica_2018", "porc_indigena_2018"], j='agno_contexto', sep='_').reset_index()

#Convertir el archivo a Spark Dataframe
vardpto_contexto_sk = spark.createDataFrame(vardpto_contexto)

In [0]:
# Se hace el join o merge con la base de peticiones usando como llaves el código de departamento y que el año de la información de contexto sea del año anterior al de la peticion

#Creamos la columna de rezago del año de contexto
vardpto_contexto_sk = vardpto_contexto_sk.withColumn("agno_contexto_mas1", col("agno_contexto")+1)

# Se realiza el join usando la siguiente condición
cond = [DF_DENUNCIAS_limpiovacios["CodDptoRegionalDireccionada"] == vardpto_contexto_sk["Divipola_dpto2"],
        DF_DENUNCIAS_limpiovacios["AnoRegistroPeticion"] == vardpto_contexto_sk["agno_contexto_mas1"]]

DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.join(vardpto_contexto_sk, cond, how='left')

# Se eliminan columnas innecesarias
DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.drop("Divipola_dpto1", "Divipola_dpto2", "agno_contexto", "agno_contexto_mas1")

## 8. Limpieza final de todas las variables, a partir de los resultados anteriores
### 8.1. Empecemos con la variables de fecha "fechaRegistroPeticion", "FechaNacAfectado"

In [0]:
# Ver las fechas más frecuentes para saber cuáles podrían ser valores atípicos o nulos
DF_DENUNCIAS_limpiovacios.groupby("fechaRegistroPeticion").count().sort(desc("count")).show(20)

In [0]:
DF_DENUNCIAS_limpiovacios.groupby("FechaNacAfectado").count().sort(desc("count")).show(20)

Al parecer las variables de "fechaRegistroPeticion" y "FechaNacAfectado" no tienen  valores atípicos

### 8.2. MotivoPeticion

In [0]:
# Se define diccionario con categorías originales como llaves (keys) y homologaciones/limpieza como valores (values)
dicc_MotivoPeticion = {
    "Amenazado por grupos armados organizados al margen de la ley": "Desvinculados", 
    "Hechos Victimizantes en el Marco del Conflicto Armado": "Desvinculados",
    "Explotación Sexual Comercial \x13†ESCNNA": "ESCNNA", 
    "Pornografía Infantil - Violencia sexual - Explotación Sexual": "ESCNNA", 
    "Prostitución Infantil - Violencia sexual - Explotación Sexual": "ESCNNA",
    "Explotación Sexual Comercial \x96 ESCNNA" : "ESCNNA",
    "Maltrato a Niñ@ en Gestación": "Maltrato", "Maltrato Físico": "Maltrato", 
    "Maltrato físico": "Maltrato", "Maltrato por negligencia": "Maltrato", "Maltrato por Negligencia": "Maltrato", 
    "Maltrato psicológico": "Maltrato", "Maltrato Psicológico": "Maltrato", 
    "Violencia física, psicológica y/o negligencia": "Maltrato",
    "Abandono": "Abandono_Amenaza", "Amenazado": "Abandono_Amenaza",
    "Víctima de uso, porte, manipulación o lesión por pólvora": "Abandono_Amenaza", "Desnutrición": "Abandono_Amenaza",
    "Apoyo a madre gestante o lactante (menor o mayor de 18 años)": "PI_Vulnerable", 
    "Atención a madre gestante Sentencia C-355 de 2006": "PI_Vulnerable", 
    "Atención a niños y niñas hasta los tres (3) años de edad en establecimientos de reclusión de mujeres": "PI_Vulnerable",
    "Carece de representante legal": "NNA_solo", "Carece de Representante Legal": "NNA_solo", "Expósito": "NNA_solo", 
    "Extraviado": "NNA_solo", "Niños, niñas y adolescentes migrantes no acompañados": "NNA_solo","Falta de Responsables": "NNA_solo",
    "Conductas Sexualizadas entre menores de 14 años": "Conductas_Adolescentes", 
    "Conductas sexualizadas entre menores de 14 años": "Conductas_Adolescentes",
    "Consumo de sustancias psicoactivas": "Conductas_Adolescentes", "Consumo de Sustancias Psicoactivas": "Conductas_Adolescentes", 
    "Problemas de comportamiento": "Conductas_Adolescentes", "Problemas de Comportamiento": "Conductas_Adolescentes", 
    "Convivencia Educativa": "Conductas_Adolescentes",
    "Incumplimiento al régimen de visitas y custodia": "Otro", "Motivo SIUCE": "Otro", 
    "NNA Víctimas de desastres naturales u otras situaciones de emergencia": "Otro", 
    "Otorga consentimiento para la adopción": "Otro", "Por establecer": "Otro", "Solicitud vinculación programa de Discapacidad": "Otro",
    "Tenencia Irregular de Niño, Niña o Adolescente": "Otro",
    "Situación de Alta Permanencia en Calle": "Permanencia_Calle",  "Situación de Vida en calle": "Permanencia_Calle", 
    "Situación de vida en calle": "Permanencia_Calle", "Solicitud de refugio de niños, niñas y Adolescentes": "Permanencia_Calle", 
    "None": "ND", 
    "Situación de trabajo infantil": "TI", "Situación Trabajo Infantil (Sin frecuencia y tiempo determinado)": "TI", 
    "Trata con fines diferentes a la explotación sexual": "Trata", "Trata con fines sexuales": "Trata", 
    "Trata de Niños, Niñas y Adolescentes": "Trata", 
    "Abuso Sexual/Violencia Sexual": "V_sexual", "Violencia Sexual": "V_sexual"
}

In [0]:
# Se verifica que las llaves tomadas en el diccionario contengan todas las categorías de la variable
# Para esto creamos una función que nos va a servir para todas las homologaciones/limpieza
def check_replace_llaves(df, var, diccionario):
    keys = list(diccionario.keys()) # Se genera una lista con las llaves del diccionario creado
    df_cuenta_valores = df.select(var).distinct().collect() # Se obtienn los valores únicos de la variable
    lista_valores = [str(row[var]) for row in df_cuenta_valores] # Se realiza una una lista con los valores únicos
    check = set(lista_valores).issubset(keys) # Check es True si las categorías de la variable están contenidas en el diccionario que creamos
    if check==True:
        df = df.na.replace(diccionario, subset=var) # Se usa el diccionario para hacer los reemplazos
        print(check, "= Todas las categorías de la variable {} están incluidas en el diccionario. \n \
        Se hizo el reemplazo de los valores de acuerdo al diccionario.".format(var))
    else:
        print(check, "No se hizo el reemplazo de los valores. \n \
        Aún falta incluir algunas categorías de la variable {} en el diccionario".format(var))
        print("Estas variables son: ", [x for x in lista_valores if x not in keys])
    return df

In [0]:
# Ahora utilizamos la función creada para verificar que el diccionario contenga todas las categorías de la variable
DF_DENUNCIAS_limpiovacios = check_replace_llaves(
    df = DF_DENUNCIAS_limpiovacios, var = "MotivoPeticion", diccionario=dicc_MotivoPeticion
)
# Reemplazamos los valores nulos por categoría "ND"
DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.na.fill(value="ND",subset=["MotivoPeticion"])
# Verificamos que el reemplazo se haya hecho correctamente
DF_DENUNCIAS_limpiovacios.groupby("MotivoPeticion").count().sort(desc("count")).show()

## 8.3. Variables de Tipo de documento "TipoDocumentoPeticionario" y "TipoDocumentoAfectado"

In [0]:
dicc_TipoDocumento = {
    "CEDULA DE CIUDADANIA": "CC", "CEDULA DE EXTRANJERIA": "CE, PASAPORTE, PEP O VISA", 
    "IDENTIDAD EXTRANJERA": "CE, PASAPORTE, PEP O VISA", "PASAPORTE": "CE, PASAPORTE, PEP O VISA", 
    "PERMISO DE PERMANENCIA": "CE, PASAPORTE, PEP O VISA", "TARJETA DE MOVILIDAD FRONTERIZA": "CE, PASAPORTE, PEP O VISA", 
    "TARJETA EXTRANJERIA": "CE, PASAPORTE, PEP O VISA", "VISA": "CE, PASAPORTE, PEP O VISA", 
    "CODIGO DANE DE IDENTIFICACIÓN ESTABLECIM": "DANE, NIT O RUT", 
    "NUMERO DE IDENTIFICACION TRIBUTARIA": "DANE, NIT O RUT", "REGISTRO UNICO TRIBUTARIO": "DANE, NIT O RUT", 
    "None": "ND", "NUMERO DE IDENTIFICACION NULO": "ND", "SIN DOCUMENTO": "ND", 
    "LIBRETA MILITAR": "ND", "NUMERO UNICO DE IDENTIFICACION PERSONAL": "ND", "OTRO": "ND", 
    "PARTIDA DE BAUTISMO": "RC o PB", "REGISTRO CIVIL": "RC o PB", "TARJETA DE IDENTIDAD": "TI"
}

In [0]:
# Aplicamos la función para la variable "TipoDocumentoPeticionario"
DF_DENUNCIAS_limpiovacios = check_replace_llaves(
    df = DF_DENUNCIAS_limpiovacios, var = "TipoDocumentoPeticionario", diccionario=dicc_TipoDocumento
)
# Reemplazamos los valores nulos por categoría "ND"
DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.na.fill(value="ND",subset=["TipoDocumentoPeticionario"])
# Verificamos que el reemplazo se haya hecho correctamente
DF_DENUNCIAS_limpiovacios.groupby("TipoDocumentoPeticionario").count().sort(desc("count")).show(truncate=False)

In [0]:
# Aplicamos la función para la variable "TipoDocumentoAfectado"
DF_DENUNCIAS_limpiovacios = check_replace_llaves(
    df = DF_DENUNCIAS_limpiovacios, var = "TipoDocumentoAfectado", diccionario=dicc_TipoDocumento
)
# Reemplazamos los valores nulos por categoría "ND"
DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.na.fill(value="ND",subset=["TipoDocumentoAfectado"])
# Verificamos que el reemplazo se haya hecho correctamente
DF_DENUNCIAS_limpiovacios.groupby("TipoDocumentoAfectado").count().sort(desc("count")).show(truncate=False)

## 8.4. Variables de "EdadPeticionario" y "EdadAfectado" - "TipoEdadAfectado"

In [0]:
# Al parecer la variable de EdadPeticionario tiene muchos 0 que deberían ser valores nulos porque la info no existe
# Revisemos primero los conteos de frecuencia de esta variable para valores muy bajos, muy altos o nulos
DF_DENUNCIAS_limpiovacios.filter(
    (DF_DENUNCIAS_limpiovacios["EdadPeticionario"] < 10) | \
    (DF_DENUNCIAS_limpiovacios["EdadPeticionario"] > 90) | \
    (DF_DENUNCIAS_limpiovacios["EdadPeticionario"].isNull())
).groupby("EdadPeticionario").count().sort("count", ascending=False).show()

In [0]:
# Para la variable anterior se pondrán como nulos todos los valores que sean menores de 2 (niños que no pueden hablar o escribir no registrarían una petición) o mayores de 112 (valores poco frecuentes de la edad pero atípicos)
DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.withColumn("EdadPeticionario", \
            when((DF_DENUNCIAS_limpiovacios["EdadPeticionario"] < 2) | \
                (DF_DENUNCIAS_limpiovacios["EdadPeticionario"] > 112), None) \
            .otherwise(DF_DENUNCIAS_limpiovacios["EdadPeticionario"]))
#Graficamos de nuevo edades más frecuentes y frecuencias relativas
total = DF_DENUNCIAS_limpiovacios.count()
DF_DENUNCIAS_limpiovacios.groupby("EdadPeticionario").count().withColumn('percentage', F.round(((col("count")/float(total))*100),3)).sort("count", ascending=False).show()

In [0]:
# La mayoría son registros nulos, por lo cual para tener esta variable completa se creará una variable categórica con las categorías "ND" y los deciles de los datos que si existen
deciles = DF_DENUNCIAS_limpiovacios.approxQuantile("EdadPeticionario", [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9], 0)
deciles

In [0]:
# Creamos las categorías de acuerdo con los deciles
# Dependiendo si estamos creando el modelo o haciendo una predicción con nuevos datos, vamos a comentar una de los siguientes grupos de lineas. Esto debido a que cuando se está haciendo predicción no se pueden utilizar los deciles de la celda anterior para crear las categorías
# Para creación de modelo
DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.withColumn("EdadPeticionario", \
            when(col("EdadPeticionario").isNull(), "ND"). \
            when((col("EdadPeticionario") <= deciles[0]), "17omenos"). \
            when((col("EdadPeticionario").between(deciles[0], deciles[1])), "18_24"). \
            when((col("EdadPeticionario").between(deciles[1], deciles[2])), "25_27"). \
            when((col("EdadPeticionario").between(deciles[2], deciles[3])), "28_30"). \
            when((col("EdadPeticionario").between(deciles[3], deciles[4])), "31_32"). \
            when((col("EdadPeticionario").between(deciles[4], deciles[5])), "33_35"). \
            when((col("EdadPeticionario").between(deciles[5], deciles[6])), "36_38"). \
            when((col("EdadPeticionario").between(deciles[6], deciles[7])), "39_42"). \
            when((col("EdadPeticionario").between(deciles[7], deciles[8])), "43_48"). \
            when((col("EdadPeticionario") > deciles[8]), "49omayor"). \
            otherwise("Error"))
# Para predicción con nuevos datos
# deciles = [17.0, 24.0, 27.0, 30.0, 32.0, 35.0, 38.0, 42.0, 48.0]
# DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.withColumn("EdadPeticionario", \
#             when(col("EdadPeticionario").isNull(), "ND"). \
#             when((col("EdadPeticionario") <= deciles[0]), "17omenos"). \
#             when((col("EdadPeticionario").between(deciles[0], deciles[1])), "18_24"). \
#             when((col("EdadPeticionario").between(deciles[1], deciles[2])), "25_27"). \
#             when((col("EdadPeticionario").between(deciles[2], deciles[3])), "28_30"). \
#             when((col("EdadPeticionario").between(deciles[3], deciles[4])), "31_32"). \
#             when((col("EdadPeticionario").between(deciles[4], deciles[5])), "33_35"). \
#             when((col("EdadPeticionario").between(deciles[5], deciles[6])), "36_38"). \
#             when((col("EdadPeticionario").between(deciles[6], deciles[7])), "39_42"). \
#             when((col("EdadPeticionario").between(deciles[7], deciles[8])), "43_48"). \
#             when((col("EdadPeticionario") > deciles[8]), "49omayor"). \
#             otherwise("Error"))
# Revisamos de nuevo frecuencias
DF_DENUNCIAS_limpiovacios.groupby("EdadPeticionario").count().withColumn('percentage', F.round(((col("count")/float(total))*100),3)).sort("count", ascending=False).show()

In [0]:
# Es necesario homologar las variables "EdadAfectado" - "TipoEdadAfectado" ya que de acuerdo a la segunda pueden 
# estar en años, meses, días o valores no válidos (12, Femenino, Masculino, No Aplica, None)
# Para ello se va a calcular la columna "EdadAfectado_Anios"
TipoEdadErroneo = ["12", "Femenino", "Masculino", "No Aplica"]
DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.withColumn("EdadAfectado_Anios", \
            when(DF_DENUNCIAS_limpiovacios["TipoEdadAfectado"].isin(TipoEdadErroneo), None). \
            when(DF_DENUNCIAS_limpiovacios["TipoEdadAfectado"] == "Años", col("EdadAfectado")). \
            when(DF_DENUNCIAS_limpiovacios["TipoEdadAfectado"] == "Meses", F.round((col("EdadAfectado")/12),3)). \
            when(DF_DENUNCIAS_limpiovacios["TipoEdadAfectado"] == "Días", F.round((col("EdadAfectado")/365),3)). \
            otherwise(None)) # Este otherwise sirve para identificar errores cuando "TipoEdadAfectado" no entra en ninguna de las cat enteriores

In [0]:
# Se observan frecuencias de valores de los 50 más frecuentes para observar posibles atípicos
DF_DENUNCIAS_limpiovacios.groupby("EdadAfectado_Anios").count().sort("count", ascending=False).show(50)

In [0]:
# Con el resultado anterior, es claro que todos los valores iguales a cero deben ser reemplazados por nulos, ya que ni siquiera registran 1 día de edad
DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.withColumn("EdadAfectado_Anios", \
            when((col("EdadAfectado_Anios") == 0), None) \
            .otherwise(col("EdadAfectado_Anios")))

In [0]:
# Calcular años entre la fecha de registro de la petición y fecha de nacimiento del afectado
DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.withColumn("EdadAfectado_Calculada", F.round(F.months_between(col("fechaRegistroPeticion"),col("FechaNacAfectado"))/F.lit(12),3))


# Si la edad calculada es mayor o igual a 5, redondear al entero más cercano. Si es negativa, dejar un valor nulo. Si es igual a 0, dejar un valor de 0.003 (1 día de nacimiento)
DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.withColumn("EdadAfectado_Calculada",
                 when(col("EdadAfectado_Calculada") >= 5, F.round(col("EdadAfectado_Calculada"),0)). \
                 when(col("EdadAfectado_Calculada") < 0, None). \
                 when(col("EdadAfectado_Calculada") == 0, 0.003). \
                 otherwise(col("EdadAfectado_Calculada")))

# Reemplazar los valores nulos de "EdadAfectado_Anios" con los valores calculados en "EdadAfectado_Calculada"
DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.withColumn('EdadAfectado_Anios', F.coalesce('EdadAfectado_Anios', 'EdadAfectado_Calculada'))
# Hacemos de nuevo una frecuencia para observar el cambio. Encontramos que los valores nulos bajaron de 262.218 a 37.049, una disminución significativa.
DF_DENUNCIAS_limpiovacios.groupby("EdadAfectado_Anios").count().sort("count", ascending=False).show(50)

In [0]:
# También se usará esta técnica para reemplazar los valores de edad afectado > 18 años, ya que potencialmente también podrían ser errores de digitación.
# Veamos una tabla de frecuencia entre las columnas "EdadAfectado_Anios" y "EdadAfectado_Calculada" para los mayores de 18
mayores18 = DF_DENUNCIAS_limpiovacios.filter(col("EdadAfectado_Anios") > 18)
print("# registros con edad mayor a 18 años: ", mayores18.count())
mayores18.groupby("EdadAfectado_Anios", "EdadAfectado_Calculada").count().sort("EdadAfectado_Anios", ascending=False).show(400)

In [0]:
# Se encuentra que para edades muy altas hay multiples casos en que los que puede haber errores de digitación y es una mejor información la edad calculada. Haremos el reemplazo solo bajo ciertas condiciones
DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.withColumn('EdadAfectado_Anios', \
                            when(((col('EdadAfectado_Anios') > 18) & \
                                (col('EdadAfectado_Anios') > col('EdadAfectado_Calculada')) & \
                                (col('EdadAfectado_Calculada').isNotNull()) & \
                                (col('EdadAfectado_Calculada') > 1) & \
                                (col('EdadAfectado_Calculada') < 21)), 
                                    col('EdadAfectado_Calculada')). \
                            otherwise(col("EdadAfectado_Anios")))

# Luego volvemos a contar los registros mayores a 18 años. Vemos que hubo cerca de 400 registros arreglados
mayores18 = DF_DENUNCIAS_limpiovacios.filter(col("EdadAfectado_Anios") > 18)
print("# registros con edad mayor a 18 años: ", mayores18.count())

In [0]:
# Volvemos a hacer una tabla observando la frecuencia de los 50 valores más altos en EdadAfectado
print(DF_DENUNCIAS_limpiovacios.groupby("EdadAfectado_Anios").count().sort("EdadAfectado_Anios", ascending=False).show(20))

# Se encuentra que hay multiples registros con edades muy altas y no han sido corregidos con la técnica anterior. A todas las edades mayores a 98 se pondrán valores nulos
DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.withColumn("EdadAfectado_Anios", \
            when((col("EdadAfectado_Anios") > 98), None) \
            .otherwise(col("EdadAfectado_Anios")))

# Finalmente, contamos el porcentaje de registros nulos para dimensionar una estrategia de imputación en esta columna tan importante
total = DF_DENUNCIAS_limpiovacios.count()
nulos = DF_DENUNCIAS_limpiovacios.filter(col("EdadAfectado_Anios").isNull()).count()
print("# de registros nulos: {} ({:.3%} del total)".format(nulos, nulos/total))

In [0]:
# Se eliminan las variables de edad que ahora no son necesarias
DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.drop("EdadAfectado_Calculada", "TipoEdadAfectado", "EdadAfectado")

In [0]:
# Se crea una variable de EdadAfectado_Agrupado, en la que se redondea la edad al entero menor (floor) y se crea el grupo "18_o_mas"
DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.withColumn("EdadAfectado_Agrupado",
                                                                 when(col("EdadAfectado_Anios")<18, F.floor(col("EdadAfectado_Anios")).cast("string")).
                                                                when(col("EdadAfectado_Anios")>=18, "18_o_mas").
                                                                otherwise(col("EdadAfectado_Anios")))

In [0]:
# Se creará una variable de EdadAfectado_Anios en la que se imputan los valores vacíos. Se imputará el promedio calculado a partir de dos variables TipoDocumentoAfectado y MotivoPeticion
valores_imputacion = DF_DENUNCIAS_limpiovacios.groupBy("TipoDocumentoAfectado", "MotivoPeticion"). \
                      agg(F.floor(F.mean('EdadAfectado_Anios'))). \
                      select(col("TipoDocumentoAfectado").alias("TipoDocumentoAfectado2"), 
                             col("MotivoPeticion").alias("MotivoPeticion2"), 
                             col("FLOOR(avg(EdadAfectado_Anios))").alias("Edad_Imputar"))

# Se guarda como archivo para poder ser reutilizado cuando se tengan nuevos datos
valores_imputacion.write.mode("overwrite").parquet("/mnt/data/Data/imputacion_EdadAfectado")

In [0]:
# Se realiza el join con el df principal
cond = [DF_DENUNCIAS_limpiovacios["TipoDocumentoAfectado"] == valores_imputacion["TipoDocumentoAfectado2"],
        DF_DENUNCIAS_limpiovacios["MotivoPeticion"] == valores_imputacion["MotivoPeticion2"]]

DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.join(valores_imputacion, cond, how='left')

# Se crea columna con valores imputados
DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.withColumn("EdadAfectado_Anios_Imputada", col("EdadAfectado_Anios"))
DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.withColumn('EdadAfectado_Anios_Imputada', F.coalesce('EdadAfectado_Anios', 'Edad_Imputar'))

# Eliminar columnas innecesarias
DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.drop("TipoDocumentoAfectado2", "MotivoPeticion2", "Edad_Imputar")

## 8.5. Variables de sexo "SexoPeticionario" y "SexoAfectado"

In [0]:
dicc_Sexo = {
    "Femenino": "F","Masculino": "M","Años": "ND",
    "No Aplica": "ND","No Reporta": "ND",
    "NO SE AUTORRECONOCE EN NINGUNO DE LOS ANTERIORES": "ND","None": "ND"
}

In [0]:
# Aplicamos la función para la variable "SexoPeticionario"
DF_DENUNCIAS_limpiovacios = check_replace_llaves(
    df = DF_DENUNCIAS_limpiovacios, var = "SexoPeticionario", diccionario=dicc_Sexo
)
# Para los valores nulos se requiere hacer un reemplazo adicional
DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.na.fill("ND", subset="SexoPeticionario")
# Verificamos que el reemplazo se haya hecho correctamente
DF_DENUNCIAS_limpiovacios.groupby("SexoPeticionario").count().sort(desc("count")).show(truncate=False)

In [0]:
# Aplicamos la función para la variable "SexoAfectado"
DF_DENUNCIAS_limpiovacios = check_replace_llaves(
    df = DF_DENUNCIAS_limpiovacios, var = "SexoAfectado", diccionario=dicc_Sexo
)
# Para los valores nulos se requiere hacer un reemplazo adicional
DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.na.fill("ND", subset="SexoAfectado")
# Verificamos que el reemplazo se haya hecho correctamente
DF_DENUNCIAS_limpiovacios.groupby("SexoAfectado").count().sort(desc("count")).show(truncate=False)

## 8.6. Variables de Grupo Étnico "GrupoEtnicoPeticionario" y "GrupoEtnicoAfectado"

In [0]:
dicc_GrupoEtnico = {
    "AFROCOLOMBIANO (A)": "AFROCOLOMBIANO (A)","COMUNIDAD NEGRA": "AFROCOLOMBIANO (A)",
    "NEGRO": "AFROCOLOMBIANO (A)","INDÍGENA": "INDÍGENA","Apuntador": "ND",
    "Masculino": "ND","N": "ND","N-D": "ND",
    "NELIDA IRIS SERRANO MENDOZA": "ND","NINGUNO": "ND",
    "NO SE AUTORRECONOCE EN NINGUNO DE LOS ANTERIORES": "ND",
    "None": "ND","Sin pertenencia étnica": "ND",
    "PALENQUERO": "PALENQUERO (A)","PALENQUERO (A)": "PALENQUERO (A)","RAIZAL": "RAIZAL",
    "san andres": "RAIZAL", "san andres ": "RAIZAL","ROM/GITANO": "ROM/GITANO"
}

In [0]:
# Aplicamos la función para la variable "GrupoEtnicoPeticionario"
DF_DENUNCIAS_limpiovacios = check_replace_llaves(
    df = DF_DENUNCIAS_limpiovacios, var = "GrupoEtnicoPeticionario", diccionario=dicc_GrupoEtnico
)
# Para los valores nulos se requiere hacer un reemplazo adicional
DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.na.fill("ND", subset="GrupoEtnicoPeticionario")
# Verificamos que el reemplazo se haya hecho correctamente
DF_DENUNCIAS_limpiovacios.groupby("GrupoEtnicoPeticionario").count().sort(desc("count")).show(truncate=False)

In [0]:
# Aplicamos la función para la variable "GrupoEtnicoAfectado"
DF_DENUNCIAS_limpiovacios = check_replace_llaves(
    df = DF_DENUNCIAS_limpiovacios, var = "GrupoEtnicoAfectado", diccionario=dicc_GrupoEtnico
)
# Para los valores nulos se requiere hacer un reemplazo adicional
DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.na.fill("ND", subset="GrupoEtnicoAfectado")
# Verificamos que el reemplazo se haya hecho correctamente
DF_DENUNCIAS_limpiovacios.groupby("GrupoEtnicoAfectado").count().sort(desc("count")).show(truncate=False)

## 8.7. Se hace un conjunto de reemplazos de variables que tienen valores inválidos o representan nulos
#### "PuebloIndigenaPeticionario", "PresentaDiscapacidadPeticionario", "CondicionDesplazamientoPeticionario", "HaSidoVictimaPeticionario", "DetalleZonAfectado", "PuebloIndigenaAfectado", "PresentaDiscapacidadAfectado", "CualDiscapacidadAfectado", "CondicionDesplazamientoAfectado"

In [0]:
#Se crea listado de variables que se puede homologar en conjunto valores inválidos o nulos
lista_var_homologar = ["DetalleZonaPeticionario", "PuebloIndigenaPeticionario", "PresentaDiscapacidadPeticionario", 
                       "CondicionDesplazamientoPeticionario", "HaSidoVictimaPeticionario", 
                       "DetalleZonAfectado", "PuebloIndigenaAfectado", "PresentaDiscapacidadAfectado", 
                       "CualDiscapacidadAfectado", "CondicionDesplazamientoAfectado"]

# Diccionario para limpiar valores nulos en estas variables
dicc_Nulos = {
    "N": None, "A": None, "No": None, "01/01/1900 0:00": None, "01/01/2004 0:00": None, "02/04/2007 0:00": None, 
    "03/03/1997 0:00": None, "07/11/2011 0:00": None, "19/09/2003 0:00": None, "2017-10-04 00:00:00.000": None, 
    "31/03/2001 0:00": None, "NO SE AUTORRECONOCE EN NINGUNO DE LOS ANTERIORES": None, 
    "Hijos de Padres que se Encuentran Privados de la Libertad por Orden Judicial": None, 
    "Omisión o negligencia": None, "Por Condiciones Especiales de Cuidadores": None
}

In [0]:
# Reemplazamos los valores de estas variables
for i in lista_var_homologar:
    DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.na.replace(dicc_Nulos, subset=i)
    DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.na.fill("ND", subset=i)
# Verificamos que todas las variables en la lista ahora tengan valores corregidos
for i in lista_var_homologar:
    DF_DENUNCIAS_limpiovacios.groupby(i).count().sort(desc("count")).show(truncate=False)

# Hay unos pocos registros en los campos "DetalleZonaPeticionario" y "DetalleZonAfectado" que tienen valor "ND" o "Dispersa". El primero se homologa como "Urbana" al ser la categoría más frecuente y el segundo como "Rural"
DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.na.replace({"ND": "Urbana", "Dispersa":"Rural"}, subset=["DetalleZonaPeticionario", "DetalleZonAfectado"])
DF_DENUNCIAS_limpiovacios.groupby("DetalleZonaPeticionario").count().sort(desc("count")).show(truncate=False)
DF_DENUNCIAS_limpiovacios.groupby("DetalleZonAfectado").count().sort(desc("count")).show(truncate=False)

## 8.8. Variable "UNFICADO_MOTIVO_INGRESO"

In [0]:
dicc_UNFICADO_MOTIVO_INGRESO = {
    "Victima de Violencia Sexual-Explotación Sexual Comercial": "ESCNNA", "Maltrato": "Maltrato", 
    "Omisión o negligencia": "Maltrato", "Violencia física": "Maltrato", "Violencia Psicológica": "Maltrato", 
    "Abandono": "Otro", "Acoso Escolar, Matoneo o Bullying": "Otro", "Alta permanencia en calle": "Otro", 
    "Amenazados Contra de su Vida por Acción de los Grupos Armados Organizados al Margen de la Ley": "Otro", 
    "Amenazados de Reclutamiento Inminente por parte de los Grupos Armados Organizados al Margen de la Ley": "Otro", 
    "Apoyo a NNA afectados por emergencias o desastres naturales": "Otro", 
    "Competencia declaratoria adoptabilidad.": "Otro", "Conductas Sexuales entre Menores de 14 años": "Otro", 
    "Consentimiento para adopción del hijo por cónyuge o compañero": "Otro", 
    "Consentimiento para adopción por consanguíneo": "Otro", "Consentimiento para adopción.": "Otro", 
    "Convivencia Educativa": "Otro", "Desnutrición": "Otro", "Desplazamiento Forzado": "Otro", "Expósito": "Otro", 
    "Extraviado": "Otro", "Falta absoluta o temporal de responsables": "Otro", 
    "Hijo (a) de gestante / lactante mayor de 18 años en protección": "Otro", "Hijos de adolescentes en PARD": "Otro", 
    "Hijos de Padres que se Encuentran Privados de la Libertad por Orden Judicial": "Otro", 
    "Hijos e hijas de víctimas directas de trata": "Otro", 
    "Huérfanos a Causa de la Violencia Armada, Hijos de Padres Desaparecidos o Secuestrados por Acción de los Grupos Armados Organizados al Margen de la Ley": "Otro", 
    "Inmigrante": "Otro", "Menor de 14 Años en Comisión de un Delito": "Otro", "Menor de 14 años Gestante": "Otro", 
    "Menor de 14 años Lactante": "Otro", "Menor de 18 Años en Situación de Discapacidad en Comisión de un Delito": "Otro", 
    "Mujer en Gestación o Lactancia en Riesgo": "Otro", "Mutilación genital femenina": "Otro", 
    "Niños, niñas y adolescentes migrantes no acompañados": "Otro", 
    "Niños, niñas, adolescentes nacidos como consecuencia del abuso sexual en el marco de conflicto armado.": "Otro", 
    "No Reclamado en Tiempo Razonable": "Otro", "Por Condiciones Especiales de Cuidadores": "Otro", 
    "Problemas del Consumo de Sustancias Psicoactivas": "Otro", "Retención Arbitraria": "Otro", 
    "Reunificación Familiar": "Otro", "Situación de Amenaza a la Integridad": "Otro", "Situación de Emergencia": "Otro", 
    "Situación de vida en calle": "Otro", 
    "Uso y/o Utilización de niño, niña o adolescente para la comisión de un delito.": "Otro", 
    "Victima de Minas Antipersonal, Municiones Sin Explotar o Artefacto Explosivo Improvisado": "Otro", 
    "Victima de Ola Invernal": "Otro", "Victima de uso, porte, manipulación o lesión por pólvora": "Otro", 
    "Victima Otros Delitos": "Otro", 
    "Víctimas de Acto Terrorista - Atentados - Combates - Enfrentamientos - Hostigamientos": "Otro", 
    "Violencia Intrafamiliar": "Otro", "Vulneración a la Intimidad": "Otro",  "Vulneración  a la Intimidad": "Otro",
    "None": "Sinvulner", 
    "Explotación Laboral": "TI", "Seguimiento al Trabajo Adolescente": "TI", "Trabajo Infantil": "TI", 
    "Trata de Personas - Explotación Sexual": "Trata de personas", 
    "Trata de personas – Fines de servidumbre": "Trata de personas", 
    "Trata de personas – Fines de Trabajo": "Trata de personas", 
    "Trata de personas – Matrimonio Servil": "Trata de personas", 
    "Trata de personas – Mendicidad Ajena": "Trata de personas", "Acceso Carnal": "V_sexual", 
    "Acoso Sexual": "V_sexual", "Actos Sexuales": "V_sexual", "Otras formas de violencia sexual": "V_sexual", 
    "Unión o matrimonio temprano": "V_sexual", "Victima de Violencia Sexual-Abuso Sexual": "V_sexual", 
    "Víctimas de violencia sexual en el marco de conflicto armado": "V_sexual", "Violación / Asalto Sexual": "V_sexual"
}

In [0]:
# Aplicamos la función para la variable "GrupoEtnicoAfectado"
DF_DENUNCIAS_limpiovacios = check_replace_llaves(
    df = DF_DENUNCIAS_limpiovacios, var = "UNFICADO_MOTIVO_INGRESO", diccionario=dicc_UNFICADO_MOTIVO_INGRESO
)

# Verificamos que el reemplazo se haya hecho correctamente
DF_DENUNCIAS_limpiovacios.groupby("UNFICADO_MOTIVO_INGRESO").count().sort(desc("count")).show(truncate=False)

## 8.9. Creación de variable TipoPeticionario

In [0]:
pd.set_option("max_rows", 200)
count_null = DF_DENUNCIAS_limpiovacios.toHandy().isnull(ratio=False)
count_null

In [0]:
@udf
def limpiar_col3(text):
  if text is not None:
    text = text.lower().strip().replace('.', '').replace('"', '').replace('ã‘', 'ñ').replace('  ', ' ')
    text = normalize('NFKD', text).encode("ascii","ignore").decode("ascii")
  return text

In [0]:
# Limpieza de textos en campos de nombres de peticionario y afectado
columnas_limpiar = ['NombresApellidosPeticionario', 'PrimerNombrePeticionario', 'SegundoNombrePeticionario', 'PrimerApellidoPeticionario', 'SegundoApellidoPeticionario', 'NombresApellidosAfectado', 'PrimerNombreAfectado', 'SegundoNombreAfectado', 'PrimerApellidoAfectado', 'SegundoApellidoAfectado', "BarrioVeredaAfectado", "DireccionAfectado", "ObsParticUbicaAfectado"]
for i in columnas_limpiar:
  DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.withColumn(i, limpiar_col3(i))

In [0]:
DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.withColumn("CategorizacionPeticionario",
                                                                 when((col("NombresApellidosPeticionario").contains("policia")) |
                                                                      (col("NombresApellidosPeticionario").contains("fiscal")) |
                                                                      (col("NombresApellidosPeticionario").contains("infancia y adolescencia")) |
                                                                      (col("NombresApellidosPeticionario").contains("polica")) |
                                                                      (col("NombresApellidosPeticionario").contains("polcia")) |
                                                                      (col("NombresApellidosPeticionario").contains("icalia")) |
                                                                      (col("NombresApellidosPeticionario").contains("caivas")) |
                                                                      (col("NombresApellidosPeticionario").contains("cavif")) |
                                                                      (col("NombresApellidosPeticionario").contains("cti")) |
                                                                      (col("NombresApellidosPeticionario").contains("cuerpo tecnico de investigacion")) |
                                                                      (col("NombresApellidosPeticionario").contains("investigacion criminal")) |
                                                                      (col("NombresApellidosPeticionario").contains("ponal")) |
                                                                      (col("NombresApellidosPeticionario").contains("inspeccion")) |
                                                                      (col("NombresApellidosPeticionario").contains("inspector")) |
                                                                      (col("NombresApellidosPeticionario").contains("tendente")) |
                                                                      (col("NombresApellidosPeticionario").contains("patruller")) |
                                                                      (col("NombresApellidosPeticionario").contains("ejercito")) |
                                                                      (col("NombresApellidosPeticionario").contains("militar")) |
                                                                      (col("NombresApellidosPeticionario").contains("batallon")) |
                                                                      (col("NombresApellidosPeticionario").contains("inpec")) |
                                                                      (col("NombresApellidosPeticionario").contains("sijin")),
                                                                      "Policia_Fiscalia").
                                                                 when((col("NombresApellidosPeticionario").contains("hospital")) |
                                                                      (col("NombresApellidosPeticionario").contains("hospial")) |
                                                                      (col("NombresApellidosPeticionario").contains("clinica")) |
                                                                      (col("NombresApellidosPeticionario").contains("empresa social del estado")) |
                                                                      (col("NombresApellidosPeticionario").contains("ips")) |
                                                                      (col("NombresApellidosPeticionario").contains("uss")) |
                                                                      (col("NombresApellidosPeticionario").contains("subred")) |
                                                                      (col("NombresApellidosPeticionario").contains("sub red")) |
                                                                      (col("NombresApellidosPeticionario").contains("red salud")) |
                                                                      (col("NombresApellidosPeticionario").contains("eps")) |
                                                                      (col("NombresApellidosPeticionario").contains("uci ")) |
                                                                      (col("NombresApellidosPeticionario").contains("de salud")) |
                                                                      (col("NombresApellidosPeticionario").contains("de la salud")) |
                                                                      (col("NombresApellidosPeticionario").contains("materno infantil")) |
                                                                      (col("NombresApellidosPeticionario").contains("cardio")) |
                                                                      (col("NombresApellidosPeticionario").contains("salud total")) |
                                                                      (col("NombresApellidosPeticionario").contains("clinico")) |
                                                                      (col("NombresApellidosPeticionario").contains("valle de lili")) |
                                                                      (col("NombresApellidosPeticionario").contains("valle del lili")) |
                                                                      (col("NombresApellidosPeticionario").contains("famisanar")) |
                                                                      (col("NombresApellidosPeticionario").contains("profamilia")) |
                                                                      (col("NombresApellidosPeticionario").contains("cuidado")) |
                                                                      (col("NombresApellidosPeticionario").contains("medic")) |
                                                                      (col("NombresApellidosPeticionario").contains("pediatr")) |
                                                                      (col("NombresApellidosPeticionario").contains("comfa")) |
                                                                      (col("NombresApellidosPeticionario").contains("meissen")) |
                                                                      (col("NombresApellidosPeticionario").contains("ese ")),
                                                                      "Hospital_Clinica").
                                                                 when((col("NombresApellidosPeticionario").contains("alcaldia")) |
                                                                      (col("NombresApellidosPeticionario").contains("municipio")) |
                                                                      (col("NombresApellidosPeticionario").contains("gobernacion")) |
                                                                      (col("NombresApellidosPeticionario").contains("idipron")) |
                                                                      (col("NombresApellidosPeticionario").contains("red unidos")) |
                                                                      (col("NombresApellidosPeticionario").contains("secretaria")) |
                                                                      (col("NombresApellidosPeticionario").contains("ministerio")) |
                                                                      (col("NombresApellidosPeticionario").contains("prosperidad social")) |
                                                                      (col("NombresApellidosPeticionario").contains("unidad nacional de proteccion")) |
                                                                      (col("NombresApellidosPeticionario").contains("migracion")) |
                                                                       (col("NombresApellidosPeticionario").contains("cancilleria")), 
                                                                       "Ejecutivo").
                                                                  when((col("NombresApellidosPeticionario").contains("centro zonal")) |
                                                                       (col("NombresApellidosPeticionario").contains("empi")) |
                                                                       (col("NombresApellidosPeticionario").contains("cespa")) |
                                                                       (col("NombresApellidosPeticionario").contains("unidad movil")) |
                                                                       (col("NombresApellidosPeticionario").contains("externado")) |
                                                                       (col("NombresApellidosPeticionario").contains("unidades moviles")) |
                                                                       (col("NombresApellidosPeticionario").contains("bienestar familiar")) |
                                                                       (col("NombresApellidosPeticionario").contains("defensor")) |
                                                                       (col("NombresApellidosPeticionario").contains("unidad de responsabilidad penal")) |
                                                                       (col("NombresApellidosPeticionario").contains("nutricional")) |
                                                                       (col("NombresApellidosPeticionario").contains("sede nacional")) |
                                                                       (col("NombresApellidosPeticionario").contains("regional")) |
                                                                       (col("NombresApellidosPeticionario").contains("restablecimiento de derechos")) |
                                                                       (col("NombresApellidosPeticionario").contains("icbf")) |
                                                                       (col("NombresApellidosPeticionario").contains("cdi")) |
                                                                       (col("NombresApellidosPeticionario").contains("cdi")) |
                                                                       (col("NombresApellidosPeticionario").startswith('cz ')), 
                                                                       "ICBF").
                                                                 when((col("NombresApellidosPeticionario").contains("comisari")) |
                                                                      (col("NombresApellidosPeticionario").contains("judicial")) |
                                                                      (col("NombresApellidosPeticionario").contains("justicia")) |
                                                                      (col("NombresApellidosPeticionario").contains("juzgado")),
                                                                     "Judicial").
                                                                 when((col("NombresApellidosPeticionario").contains("colegio")) |
                                                                      (col("NombresApellidosPeticionario").contains("escuela")) |
                                                                      (col("NombresApellidosPeticionario").contains("escolar")) |
                                                                      (col("NombresApellidosPeticionario").contains("liceo")) |
                                                                      (col("NombresApellidosPeticionario").contains("ied")) |
                                                                      (col("NombresApellidosPeticionario").contains(" ie ")) |
                                                                      (col("NombresApellidosPeticionario").startswith('ie ')) |
                                                                      (col("NombresApellidosPeticionario").startswith('iem ')) |
                                                                      (col("NombresApellidosPeticionario").contains("inem")) |
                                                                      (col("NombresApellidosPeticionario").contains("educaiva")) |
                                                                      (col("NombresApellidosPeticionario").contains("instituto tec")) |
                                                                      (col("NombresApellidosPeticionario").contains("tecnico")) |
                                                                      (col("NombresApellidosPeticionario").contains("rector")) |
                                                                      (col("NombresApellidosPeticionario").contains("educativa")),
                                                                     "Colegio").
                                                                 when((col("NombresApellidosPeticionario").contains("defensoria del pueblo")) |
                                                                      (col("NombresApellidosPeticionario").contains("procuraduria")) |
                                                                      (col("NombresApellidosPeticionario").contains("personeria")) |
                                                                      (col("NombresApellidosPeticionario").contains("de paso")) |
                                                                      (col("NombresApellidosPeticionario").contains("hogar paso")) |
                                                                      (col("NombresApellidosPeticionario").contains("notaria")),
                                                                     "MinisterioPublico_HogarDePaso").
                                                                 when((col("NombresApellidosPeticionario").contains("fundacion")) |
                                                                      (col("NombresApellidosPeticionario").contains("ong")) |
                                                                      (col("NombresApellidosPeticionario").contains("organizacion")) |
                                                                      (col("NombresApellidosPeticionario").contains("instituto")) |
                                                                      (col("NombresApellidosPeticionario").contains("institucion")) |
                                                                      (col("NombresApellidosPeticionario").contains("corpo")) |
                                                                      (col("NombresApellidosPeticionario").contains("corpro")) |
                                                                      (col("NombresApellidosPeticionario").contains("centro")) |
                                                                      (col("NombresApellidosPeticionario").contains("crecer en familia")) |
                                                                      (col("NombresApellidosPeticionario").contains("asociacion")),
                                                                     "Fundacion_ONG").
                                                                 when((col("NombresApellidosPeticionario").contains("anonimo")) |
                                                                      (col("NombresApellidosPeticionario").contains("anomino")) |
                                                                      (col("NombresApellidosPeticionario").contains("aninimo")) |
                                                                      (col("NombresApellidosPeticionario").contains("anomimo")) |
                                                                      (col("NombresApellidosPeticionario").contains("n o n i m o")) |
                                                                      (col("NombresApellidosPeticionario").contains("no report")) |
                                                                      (col("NombresApellidosPeticionario").contains("no refiere")) |
                                                                      (col("NombresApellidosPeticionario").contains("no registra")) |
                                                                      (col("NombresApellidosPeticionario").contains("nn nn")) |
                                                                      (col("NombresApellidosPeticionario").contains("nulo nulo")) |
                                                                      (col("NombresApellidosPeticionario").contains("sd ")) |
                                                                      (col("NombresApellidosPeticionario").contains("s-d")) |
                                                                      (col("NombresApellidosPeticionario").contains("xx")) |
                                                                      (col("NombresApellidosPeticionario").contains("x x")) |
                                                                      (col("NombresApellidosPeticionario").contains("nr nr")) |
                                                                      (col("NombresApellidosPeticionario").contains("na na")) |
                                                                      (col("NombresApellidosPeticionario").contains("n/a")) |
                                                                      (col("NombresApellidosPeticionario").contains("n n n n")) |
                                                                      (col("NombresApellidosPeticionario").contains("n n n")) |
                                                                      (col("NombresApellidosPeticionario").contains("nn nn")) |
                                                                      (col("NombresApellidosPeticionario").contains("nna nna")) |
                                                                      (col("NombresApellidosPeticionario").contains("anonima")) |
                                                                      (col("NombresApellidosPeticionario").contains("no aporta")) |
                                                                      (col("NombresApellidosPeticionario").contains("no aplica")) |
                                                                      (col("NombresApellidosPeticionario").contains("sin informacion")) |
                                                                      (col("NombresApellidosPeticionario").contains("123")) |
                                                                      (col("NombresApellidosPeticionario").contains("106")) |
                                                                      (col("NombresApellidosPeticionario").contains("linea de emergencia")) |
                                                                      (col("NombresApellidosPeticionario").contains("dato")), 
                                                                      "Anonimo").
                                                                 when((col("NombresApellidosPeticionario").isNull()) |
                                                                      (col("NombresApellidosPeticionario")=="n n") |
                                                                      (col("NombresApellidosPeticionario")=="n r") |
                                                                      (col("NombresApellidosPeticionario")=="nr") |
                                                                      (col("NombresApellidosPeticionario")=="nr nr") |
                                                                      (col("NombresApellidosPeticionario")=="na") |
                                                                      (col("NombresApellidosPeticionario")=="nn") | (col("NombresApellidosPeticionario")=="sd") | (col("NombresApellidosPeticionario")=="") |
                                                                      (col("NombresApellidosPeticionario")==" "), 
                                                                      "Anonimo").
                                                                 when((col("NombresApellidosPeticionario")==col("NombresApellidosAfectado")),
                                                                     "MismoAfectado").
                                                                 when((col("PrimerApellidoAfectado")==col("PrimerApellidoPeticionario")) |
                                                                      (col("PrimerApellidoAfectado")==col("SegundoApellidoPeticionario")) |
                                                                      (col("SegundoApellidoAfectado")==col("PrimerApellidoPeticionario")) |
                                                                      (col("SegundoApellidoAfectado")==col("SegundoApellidoPeticionario")),
                                                                      "Familiar").
                                                                 otherwise("Otro"))
DF_DENUNCIAS_limpiovacios.groupby("CategorizacionPeticionario").count().sort("count").show()

In [0]:
sin_info = "an[oi][nm]i[mn][oa]|a n o n i m o|rep[or][ro]t[oae]|repota|no refiere|no registra|no sabe|no informa|no aporta|no aplica|nulo|no sabe|no aporta|no aplica|nn?nn|^sd ?|s-d| ?sd|xx|x x|^x$|^nr ?|^n r ?|^na ?|^nna ?|n ?n ?a ?|n/a|n/r|n n n|^n ?n$| nn$|^nr ?$| nr$|^n$|^no ?$|^nin[oa\@]s?|ningu?n[oa]|desconoc|sin iden|sin info|sin dato|sin mas dato|sin nombre|varios|menor|nas?citur|^cl 0 ?|^cl$|^calle$|^kr 0 ?|^av 0 ?|^centro$|sin n[uo]men|sin direccion|no la conoce|no tiene|no conoce|^0 ?|^ ?$|^-$"

DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.withColumn("SinSuficienteInfo_Afectado",
                                                                 when((col("NombresApellidosAfectado").isNull()) |
                                                                      (col("NombresApellidosAfectado").rlike(sin_info)),
                                                                      "SinInfo").
                                                                 when(((col("BarrioVeredaAfectado").isNull()) & (col("DireccionAfectado").isNull()) & (col("ObsParticUbicaAfectado").isNull())) |
                                                                      ((col("BarrioVeredaAfectado").isNull()) & (col("DireccionAfectado").isNull()) & (col("ObsParticUbicaAfectado").rlike(sin_info))) |
                                                                      ((col("BarrioVeredaAfectado").isNull()) & (col("DireccionAfectado").isNull())) |
                                                                      ((col("BarrioVeredaAfectado").isNull()) & (col("DireccionAfectado").rlike(sin_info))) |
                                                                      ((col("BarrioVeredaAfectado").rlike(sin_info)) & (col("DireccionAfectado").isNull())) |
                                                                      ((col("BarrioVeredaAfectado").rlike(sin_info)) & (col("DireccionAfectado").rlike(sin_info))),
                                                                      "SinInfo").
                                                                 otherwise("ConInfo"))

#### 8.10 Homologación de categorías en CanalRecepcion

In [0]:
# Se miran frecuencias relativas para tomar decisión de homologación, de forma que no se mezclen categorías que claramente permiten distinguir la variable objetivo
table_doub = DF_DENUNCIAS_limpiovacios.crosstab("CanalRecepcion", "VAR_OBJETIVO")
table_sing = DF_DENUNCIAS_limpiovacios.groupby("CanalRecepcion").count()
table = table_doub.join(table_sing, table_doub["CanalRecepcion_VAR_OBJETIVO"]==table_sing["CanalRecepcion"],how="outer").drop("CanalRecepcion")

for i in ["falsa", "sindefinir_fallida", "verdadera_nopard", "verdadera_pard_inst", "verdadera_pard_noinst"]:
  table = table.withColumn(i, F.round((col(i)/col("count"))*100, 2))
table.sort("count", ascending=False).show()

# Se miran frecuencias relativas para tomar decisión de homologación, de forma que no se mezclen categorías que claramente permiten distinguir la variable objetivo
table_doub = DF_DENUNCIAS_limpiovacios.crosstab("CanalRecepcion", "AnoRegistroPeticion")
table_sing = DF_DENUNCIAS_limpiovacios.groupby("CanalRecepcion").count()
table = table_doub.join(table_sing, table_doub["CanalRecepcion_AnoRegistroPeticion"]==table_sing["CanalRecepcion"],how="outer").drop("CanalRecepcion")

# for i in ["2015", "2016", "2017", "2018", "2019", "2020", "2021"]:
#   table = table.withColumn(i, F.round((col(i)/col("count"))*100, 2))
table.sort("count", ascending=False).show()

In [0]:
dicc_canalatencion = {
  "Virtual": "Portal", "Instagram": "Redes Sociales", "FaceBook": "Redes Sociales", "Twitter": "Redes Sociales", "Buzón": "Escrito", "Teléfono Verde": "Telefónico Verbal"
}
DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.na.fill("Telefónico Verbal", subset="CanalRecepcion")
DF_DENUNCIAS_limpiovacios = DF_DENUNCIAS_limpiovacios.na.replace(dicc_canalatencion, subset="CanalRecepcion")

DF_DENUNCIAS_limpiovacios.groupby("CanalRecepcion").count().sort("count", ascending=False).show()

# 10. Se guarda el archivo final en la instacia SQL y en formato parquet

In [0]:
# Luego de los ajustes escribimos los datos a la instancia SQL
jdbcHostname = dbutils.secrets.get('blobsecret','SQLHostname')
jdbcDatabase = dbutils.secrets.get('blobsecret','SQLDatabase')
jdbcPort = dbutils.secrets.get('blobsecret','SQLport')
jdbcuser = dbutils.secrets.get('blobsecret','SQLusername')
jdbcpass = dbutils.secrets.get('blobsecret','SQLpassword')
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname, jdbcPort, jdbcDatabase)
connectionProperties = {
  "user" : jdbcuser,
  "password" : jdbcpass,
  "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

DF_DENUNCIAS_limpiovacios.write.format("com.microsoft.sqlserver.jdbc.spark").mode("overwrite").option("url", jdbcUrl).option("dbtable", 'DENUNCIAS2015_2021_31032021_BASELIMPIA').option("user", jdbcuser).option("password", jdbcpass).save()

In [0]:
# Configuración necesaria para poder guardar archivos en el storage account
spark.conf.set ( "fs.azure.account.key.blobName.dfs.core.windows.net",
  "blobKey"
)

file_location = "abfss://basecruzada@blobName.dfs.core.windows.net/baselimpia.parquet"
DF_DENUNCIAS_limpiovacios.coalesce(1).write.option("header", "true").mode("overwrite").parquet(file_location)

# 8. Se construye diccionario de datos

In [0]:
def describe_spark_df(df, debug=False):
    start_time = datetime.now() # Para contabilizar cuánto se demora el proceso
    diccionario = [] # Inicializar el diccionario en que se guardarán los datos
    len_df = df.count() # Número de filas que tiene el df
    n_cols_df = len(df.columns) # Número de columnas que tiene el df
    series_null = df.toHandy().isnull(ratio=False) # Esta funcion nos permite contar rápidamente los nulos
    
    for contador, i in enumerate(df.columns):
        if debug: # En caso de errores en el código, debug=True nos dice en cuál variable se ocasiona el problema
            print(contador, i) 
            
        valores = cuenta_valores = moda_val = media_val = ds_val = minimo_val \
        = p25_val = mediana_val = p75_val = maximo_val = "" # Inicializar outputs
        
        df_column = df.select(i)  # Seleccionamos solo la columna que esta corriendo en el loop
        tipo_dato = df_column.dtypes[0][1] # Qué tipo de dato es la columna
        largo_col = df_column.withColumn('largo_col', \
                    F.length(i)).agg({"largo_col": "max"}).collect()[0]["max(largo_col)"] # Máximo largo carácteres columna
        
        null_val = series_null.loc[i] # Recuperamos el número de nulos que tiene la columna
        porc_null = null_val / len_df # Porcentaje de nulos
        
        # Cuenta de valores únicos
        cuenta_unicos = df_column.agg(F.countDistinct(df_column[i]).alias('Unicos')).collect()[0]['Unicos']
        
        if cuenta_unicos < 100:
            df_cuenta_valores = df_column.groupBy(i).count().sort(desc("count")).collect()
            moda_val = str(df_cuenta_valores[0][i]) + " - " + str(df_cuenta_valores[0]["count"])
            
            lista_valores = [str(row[i]) for row in df_cuenta_valores]
            lista_valores2 = lista_valores.copy()
            lista_valores2.sort()
            separador = ", "
            valores = separador.join(lista_valores2)

            lista_cuenta = [str(row["count"]) for row in df_cuenta_valores]
            lista_cuentavalores = [lista_valores[j] + " - " + lista_cuenta[j] for j in range(len(lista_valores))]
            cuenta_valores = separador.join(lista_cuentavalores)
            
        if ((tipo_dato == "int") | (tipo_dato == "double")) & (largo_col!=None):
            media_val = "{:.4f}".format(df_column.agg({i: 'mean'}).collect()[0][0])
            ds_val = "{:.4f}".format(df_column.agg({i: 'stddev'}).collect()[0][0])
            minimo_val = "{:.4f}".format(df_column.agg({i: 'min'}).collect()[0][0])
            p25_val = "{:.4f}".format(df_column.agg(F.expr('percentile({}, array(0.25))'.format(i))).collect()[0][0][0])
            mediana_val = "{:.4f}".format(df_column.agg(F.expr('percentile({}, array(0.50))'.format(i))).collect()[0][0][0])
            p75_val = "{:.4f}".format(df_column.agg(F.expr('percentile({}, array(0.75))'.format(i))).collect()[0][0][0])
            maximo_val = "{:.4f}".format(df_column.agg({i: 'max'}).collect()[0][0])

        data_campo = {
            "Campo": i,
            "Tipo Dato": str(tipo_dato),
            "Longitud Caracteres": largo_col,
            "Valores nulos": "{:d}".format(null_val),
            "Porcentaje nulos": "{:.2%}".format(porc_null),
            "Número de valores únicos": "{:d}".format(cuenta_unicos),
            "Valores únicos": valores,
            "Cuenta valores únicos": cuenta_valores,
            "Moda - Freq": moda_val,
            "Media": str(media_val),
            "Desviación estándar": str(ds_val),
            "Mínimo": str(minimo_val),
            "Percentil 25%": str(p25_val),
            "Mediana": str(mediana_val),
            "Percentil 75%": str(p75_val),
            "Máximo": str(maximo_val)             
        }
        
        diccionario.append(data_campo)

        if ((contador+1) % 10 == 0) | ((contador+1) == n_cols_df):
            print("Progreso: ", ((contador+1)/n_cols_df)*100, "%. Duración: ", datetime.now() - start_time)
    df_resultado = pd.DataFrame(diccionario)
    return df_resultado

In [0]:
# Se ejecuta la función definida en la anterior celda que hace por completo la descripción de todas columnas.
# Este se guarda en un pd.DataFrame que llamamos descriptivas_limpieza
descriptivas_limpieza = describe_spark_df(DF_DENUNCIAS_limpiovacios)

In [0]:
# Convertimos el resultado de pd.DataFrame a spark.Dataframe
descriptivas_limpieza_spark = spark.createDataFrame(descriptivas_limpieza)
# Exportamos el spark.Dataframe a nuestro contenedor de Azure y lo previsualizamos acá en Databricks
file_location = "abfss://export@blobName.dfs.core.windows.net/basecruzada/descriptivas_limpieza_final.csv"
descriptivas_limpieza_spark.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").mode("overwrite").save(file_location)
display(descriptivas_limpieza_spark)

In [0]:
from shutil import copyfile
final = '/dbfs/mnt/data/Data/descriptivas_limpieza_final.xlsx'
temp_file = '/tmp/descriptivas_limpieza_final.xlsx'
descriptivas_limpieza.to_excel(temp_file, engine='openpyxl')

copyfile(temp_file, final)