## Dataclening

In [0]:
from pyspark.sql import functions as F
tran_inmobiliarias_original = _sqldf

In [0]:
tran_inmobiliarias = tran_inmobiliarias_original

## Matricula
##### Regla 1 - calidad: Matriculas con nulos
0: Sin inconsistencia - 1: Con inconsistencia

Una propiedad sin matrícula = No es un inmueble completamente formalizado.
Estos no se puede vender legalmente
No se puede hipotecar ni usar como garantía
Registros sin matrícula deben ser investigados o descartados para análisis legales.


In [0]:
tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "alerta_calidad_matricula_nula",
    F.when((F.col("MATRICULA").isNotNull()) & (F.trim(F.col("MATRICULA")) != ""), 0).otherwise(1)
)

tran_inmobiliarias.groupBy("alerta_calidad_matricula_nula").count().show()


##### Regla 2 - MATRÍCULA debe iniciar con código ORIP (primeros 3 dígitos)

In [0]:
from pyspark.sql.functions import substring, col, lpad

tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "ORIP_NORMALIZADO",
    F.lpad(F.col("ORIP").cast("string"), 3, "0")
)

tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "ORIP_MATRICULA",
    F.substring(F.col("MATRICULA"), 1, 3)
)

tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "alerta_calidad_orip_inconsistente",
    F.when(
        F.col("ORIP_NORMALIZADO") != F.col("ORIP_MATRICULA"),
        F.lit(1)
    ).otherwise(F.lit(0))
)

tran_inmobiliarias = tran_inmobiliarias.drop("ORIP_NORMALIZADO", "ORIP_MATRICULA")

conteo = tran_inmobiliarias.groupBy("alerta_calidad_orip_inconsistente").count()
conteo.show()

## FECHA_RADICA_TEXTO
##### Regla 1 - formato: Estandarización de fechas

Que fechas cumplen con la regla ISO 8601

0: Sin inconsistencia - 1: Con inconsistencia

In [0]:
# Expresión regular básica para fechas ISO 8601 tipo YYYY-MM-DD
iso_regex = r"^\d{4}-\d{2}-\d{2}$"

tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "alerta_formato_iso_fecha_radica",
    F.when(
        F.col("FECHA_RADICA_TEXTO").rlike(iso_regex), 0
    ).otherwise(1)
)

tran_inmobiliarias.groupBy("alerta_formato_iso_fecha_radica").count().show()


com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:134)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:129)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:129)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:715)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:435)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:435)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:466)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:757)
	at com.data

##### Regla 2 - calidad: Fechas futuras

0: Sin inconsistencia - 1: Con inconsistencia

In [0]:
# Estandariza fecha correctamente para empezar a validar las reglas

# 1. Columna auxiliar para el formato ISO YYYY-MM-DD
tran_inmobiliarias = tran_inmobiliarias.withColumn(
    'FECHA_RADICA_TEMP_ISO',
    F.when(
        F.col('FECHA_RADICA_TEXTO').rlike('^\\d{4}-\\d{2}-\\d{2}'),
        F.to_date(F.substring(F.col('FECHA_RADICA_TEXTO'), 1, 10), 'yyyy-MM-dd')
    ).otherwise(F.lit(None).cast('date')) # Devuelve NULL si no coincide
)

# 2. Columna auxiliar para el formato DD/MM/YYYY (4 dígitos de año)
# Usamos F.regexp_extract para extraer la fecha e ignorar cualquier texto o número inicial.
tran_inmobiliarias = tran_inmobiliarias.withColumn(
    'FECHA_RADICA_TEMP_DMY4',
    F.when(
        # Verifica que el patrón DD/MM/YYYY exista en algún lugar de la cadena
        F.col('FECHA_RADICA_TEXTO').rlike('\\d{2}/\\d{2}/\\d{4}'),
        # Extrae solo el grupo de la fecha y lo convierte
        F.to_date(
            F.regexp_extract(F.col('FECHA_RADICA_TEXTO'), '(\\d{2}/\\d{2}/\\d{4})', 1),
            'dd/MM/yyyy'
        )
    ).otherwise(F.lit(None).cast('date')) # Devuelve NULL si no coincide
)

# 3. Columna auxiliar para el formato DD/MM/YY (2 dígitos de año)
# Usamos F.regexp_extract para extraer la fecha e ignorar cualquier texto o número inicial,
# solucionando casos como '25841 25/08/22'.
tran_inmobiliarias = tran_inmobiliarias.withColumn(
    'FECHA_RADICA_TEMP_DMY2',
    F.when(
        # Verifica que el patrón DD/MM/YY exista en algún lugar de la cadena
        F.col('FECHA_RADICA_TEXTO').rlike('\\d{2}/\\d{2}/\\d{2}'),
        # Extrae solo el grupo de la fecha y lo convierte
        F.to_date(
            F.regexp_extract(F.col('FECHA_RADICA_TEXTO'), '(\\d{2}/\\d{2}/\\d{2})', 1),
            'dd/MM/yy'
        )
    ).otherwise(F.lit(None).cast('date')) # Devuelve NULL si no coincide
)

# 4. Consolidar las fechas: Usamos F.coalesce para tomar el primer valor de fecha no nulo
# y SOBRESCRIBIMOS la columna original 'FECHA_RADICA_TEXTO'.
tran_inmobiliarias = tran_inmobiliarias.withColumn(
    'FECHA_RADICA_STD', # Reemplaza la columna original (ahora de tipo Date)
    F.coalesce(
        F.col('FECHA_RADICA_TEMP_ISO'),
        F.col('FECHA_RADICA_TEMP_DMY4'),
        F.col('FECHA_RADICA_TEMP_DMY2')
    )
)

# 5. Eliminar las columnas auxiliares temporales.
tran_inmobiliarias = tran_inmobiliarias.drop(
    'FECHA_RADICA_TEMP_ISO',
    'FECHA_RADICA_TEMP_DMY4',
    'FECHA_RADICA_TEMP_DMY2'
)

# Mostrar el resultado final con la columna de fecha ya convertida
tran_inmobiliarias.select('PK', 'FECHA_RADICA_STD').display()

In [0]:
tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "alerta_calidad_fecha_radica_futura",
    F.when(F.col("FECHA_RADICA_STD") > F.current_date(), 1).otherwise(0)
)

tran_inmobiliarias.groupBy("alerta_calidad_fecha_radica_futura").count().show()

#### Regla 3 - calidad: Fechas nulas

0: Sin inconsistencia - 1: Con inconsistencia

In [0]:

tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "alerta_calidad_fecha_radica_nula",
    F.when(F.col("FECHA_RADICA_STD").isNull(), 1).otherwise(0)
)

tran_inmobiliarias.groupBy("alerta_calidad_fecha_radica_nula").count().show()

In [0]:
matriculas_inconsistentes_detalle = tran_inmobiliarias.filter(
    F.col("alerta_calidad_fecha_radica_nula") == 1
)

display(matriculas_inconsistentes_detalle)

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-7666992871293451>, line 5[0m
[1;32m      1[0m matriculas_inconsistentes_detalle [38;5;241m=[39m tran_inmobiliarias[38;5;241m.[39mfilter(
[1;32m      2[0m     F[38;5;241m.[39mcol([38;5;124m"[39m[38;5;124malerta_calidad_fecha_radica_nula[39m[38;5;124m"[39m) [38;5;241m==[39m [38;5;241m1[39m
[1;32m      3[0m )
[0;32m----> 5[0m display(matriculas_inconsistentes_detalle)

File [0;32m/databricks/python_shell/lib/dbruntime/display.py:142[0m, in [0;36mDisplay.display[0;34m(self, input, *args, **kwargs)[0m
[1;32m    140[0m [38;5;66;03m# This version is for Serverless + Spark Connect dogfooding.[39;00m
[1;32m    141[0m [38;5;28;01melif[39;00m [38;5;28mself[39m[38;5;241m.[39mspark_connect_enabled [38;5;129;01mand[39;00m [38;5;28misinstance[39m([38;5;28minput[

#### Regla 4 - Calidad: multiples transacciones por día.

In [0]:
df_repeticiones_matricula_dia = tran_inmobiliarias.groupBy("MATRICULA", "FECHA_RADICA_STD").count()
display(df_repeticiones_matricula_dia.orderBy(desc("count")))

In [0]:
from pyspark.sql.functions import col 
display(tran_inmobiliarias.filter(col("MATRICULA") == '041-83100'))

In [0]:
from pyspark.sql.functions import col, sum

matricula_anomala = tran_inmobiliarias.filter(col("MATRICULA") == '041-83100')
display(matricula_anomala.select(sum(col("VALOR")).alias("SUMA_TOTAL_VALOR_ANOMALA")))

[0;31m---------------------------------------------------------------------------[0m
[0;31mNumberFormatException[0m                     Traceback (most recent call last)
File [0;32m<command-5711308610042780>, line 4[0m
[1;32m      1[0m [38;5;28;01mfrom[39;00m [38;5;21;01mpyspark[39;00m[38;5;21;01m.[39;00m[38;5;21;01msql[39;00m[38;5;21;01m.[39;00m[38;5;21;01mfunctions[39;00m [38;5;28;01mimport[39;00m col, [38;5;28msum[39m
[1;32m      3[0m matricula_anomala [38;5;241m=[39m tran_inmobiliarias[38;5;241m.[39mfilter(col([38;5;124m"[39m[38;5;124mMATRICULA[39m[38;5;124m"[39m) [38;5;241m==[39m [38;5;124m'[39m[38;5;124m041-83100[39m[38;5;124m'[39m)
[0;32m----> 4[0m display(matricula_anomala[38;5;241m.[39mselect([38;5;28msum[39m(col([38;5;124m"[39m[38;5;124mVALOR[39m[38;5;124m"[39m))[38;5;241m.[39malias([38;5;124m"[39m[38;5;124mSUMA_TOTAL_VALOR_ANOMALA[39m[38;5;124m"[39m)))

File [0;32m/databricks/python_shell/lib/dbruntime/display.

In [0]:
from pyspark.sql.functions import col, avg, count
display(df_repeticiones_matricula_dia.select(avg(col("count"))))

In [0]:
from pyspark.sql.window import Window

window_spec = Window.partitionBy("MATRICULA", "FECHA_RADICA_STD")

tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "count_repeticiones", 
    F.count("*").over(window_spec)
).withColumn(
    "alerta_calidad_multiples_radicaciones",
    F.when(F.col("count_repeticiones") > 3, F.lit(1)).otherwise(F.lit(0))
)

tran_inmobiliarias = tran_inmobiliarias.drop("count_repeticiones")

tran_inmobiliarias.groupBy("alerta_calidad_multiples_radicaciones").count().show()

In [0]:
transacciones_fuera_prom = tran_inmobiliarias.filter(col("alerta_calidad_multiples_radicaciones") == 1)
display(transacciones_fuera_prom)


## FECHA_APERTURA_TEXTO

##### Regla 1 - formato: Estandarización de fechas

Que fechas cumplen con la regla ISO 8601

0: Sin inconsistencia - 1: Con inconsistencia

In [0]:
# Expresión regular básica para fechas ISO 8601 tipo YYYY-MM-DD
iso_regex = r"^\d{4}-\d{2}-\d{2}$"

tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "alerta_formato_iso_fecha_apertura",
    F.when(
        F.col("FECHA_APERTURA_TEXTO").rlike(iso_regex), 0
    ).otherwise(1)
)

tran_inmobiliarias.groupBy("alerta_formato_iso_fecha_apertura").count().show()


com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:134)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:129)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:129)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:715)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:435)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:435)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:466)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:757)
	at com.data

##### Regla 2 - calidad: Debe existir un único valor de apertura por matrícula.

0: Sin inconsistencia - 1: Con inconsistencia

In [0]:
# Estandariza fecha correctamente para empezar a validar las reglas
tran_inmobiliarias = tran_inmobiliarias.withColumn(
    'FECHA_APER_TEMP_ISO',
    F.when(
        # Busca que empiece por 4 dígitos - 2 dígitos - 2 dígitos
        F.col('FECHA_APERTURA_TEXTO').rlike('^\\d{4}-\\d{2}-\\d{2}'),
        # Cortamos los primeros 10 caracteres para ignorar la hora y convertimos
        F.to_date(F.substring(F.col('FECHA_APERTURA_TEXTO'), 1, 10), 'yyyy-MM-dd')
    ).otherwise(F.lit(None).cast('date'))
)

# 2. Columna auxiliar para el formato DD-MM-YYYY (Guiones)
# Este captura los 600 registros con formato '99-99-9999'
tran_inmobiliarias = tran_inmobiliarias.withColumn(
    'FECHA_APER_TEMP_DMY_GUION',
    F.when(
        # Verifica patrón ##-##-#### (Guiones en vez de slash)
        F.col('FECHA_APERTURA_TEXTO').rlike('\\d{2}-\\d{2}-\\d{4}'),
        F.to_date(
            # Extraemos la fecha limpiamente por si hay espacios
            F.regexp_extract(F.col('FECHA_APERTURA_TEXTO'), '(\\d{2}-\\d{2}-\\d{4})', 1),
            'dd-MM-yyyy' # OJO: Formato con guiones
        )
    ).otherwise(F.lit(None).cast('date'))
)

# 3. Consolidar: Creamos FECHA_APERTURA_STD
# Usamos coalesce para priorizar ISO, luego Guiones.
tran_inmobiliarias = tran_inmobiliarias.withColumn(
    'FECHA_APERTURA_STD',
    F.coalesce(
        F.col('FECHA_APER_TEMP_ISO'),
        F.col('FECHA_APER_TEMP_DMY_GUION')
    )
)

# 4. Limpieza de columnas temporales
tran_inmobiliarias = tran_inmobiliarias.drop(
    'FECHA_APER_TEMP_ISO',
    'FECHA_APER_TEMP_DMY_GUION'
)

In [0]:
conteo_inconsistencia = tran_inmobiliarias.groupBy("MATRICULA").agg(
    F.countDistinct(F.col("FECHA_APERTURA_STD")).alias("conteo_fechas_distintas")
)

tran_inmobiliarias = tran_inmobiliarias.join(
    conteo_inconsistencia,
    on="MATRICULA",
    how="left"
)

tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "alerta_calidad_fecha_apertura_distinta",
    F.when(F.col("conteo_fechas_distintas") > 1, 1)
     .otherwise(0)
)

tran_inmobiliarias.groupBy("alerta_calidad_fecha_apertura_distinta").count().show()

In [0]:
matriculas_inconsistentes_detalle = tran_inmobiliarias.filter(
    F.col("alerta_calidad_fecha_apertura_distinta") == 1
)

matriculas_inconsistentes_reporte = matriculas_inconsistentes_detalle.select(
    "MATRICULA", 
    "conteo_fechas_distintas"
).distinct().orderBy(F.desc("conteo_fechas_distintas"))

print("Matrículas con más de una Fecha de Apertura (Top 20 Inconsistencias):")
matriculas_inconsistentes_reporte.show(20, truncate=False)

In [0]:
matricula_inconsistente_detalle = tran_inmobiliarias.filter(F.col("MATRICULA") == "072-11366")
display(matricula_inconsistente_detalle)

#### Regla 3 - calidad: Fechas futuras

In [0]:
tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "alerta_calidad_fecha_apertura_futura",
    F.when(F.col("FECHA_APERTURA_STD") > F.current_date(), 1).otherwise(0)
)

tran_inmobiliarias.groupBy("alerta_calidad_fecha_apertura_futura").count().show()

In [0]:
matriculas_inconsistentes_detalle = tran_inmobiliarias.filter(
    F.col("alerta_calidad_fecha_apertura_futura") == 1
)

display(matriculas_inconsistentes_detalle)

In [0]:
matricula_inconsistente_detalle = tran_inmobiliarias.filter(F.col("MATRICULA") == "072-40752")
display(matricula_inconsistente_detalle)

#### Regla 4 - calidad: Matriculas sin ninguna fecha apertura asociada

In [0]:
from pyspark.sql.types import IntegerType

tran_inmobiliarias = tran_inmobiliarias.withColumn(
    'fecha_presente',
    F.when(
        (F.col("FECHA_APERTURA_TEXTO").isNotNull()) &
        (F.trim(F.col("FECHA_APERTURA_TEXTO")) != ""),
        F.lit(1)
    ).otherwise(F.lit(0))
    .cast(IntegerType())
)


# Agrupar por MATRICULA y determinar la alerta final
# Se usa F.max: Si F.max('fecha_presente') es 0, es porque NINGUNA transacción tenía fecha.
tran_inmobiliarias = tran_inmobiliarias.groupBy("MATRICULA").agg(
    F.max("fecha_presente").alias("max_fecha_presente")
).withColumn(
    'alerta_calidad_no_fecha_apertura',
    F.when(
        F.col("max_fecha_presente") == 0,
        F.lit(1)
    ).otherwise(F.lit(0))
    .cast(IntegerType())
)

tran_inmobiliarias = tran_inmobiliarias.drop('fecha_presente', 'max_fecha_presente')

display(
    tran_inmobiliarias.groupBy("alerta_calidad_no_fecha_apertura").count()
)

In [0]:
matriculas_inconsistentes_detalle = tran_inmobiliarias.filter(
    F.col("alerta_calidad_no_fecha_apertura") == 1
)

display(matriculas_inconsistentes_detalle)

In [0]:
matricula_inconsistente_detalle = tran_inmobiliarias.filter(F.col("MATRICULA") == "240-37285")
display(matricula_inconsistente_detalle)

#### Regla 5 - formato: Fechas nulas

Viendo el formato general de los datos, se repite la fecha apertura en cada registro asociado a una matricula, aunque se hallaron nulos que no se justifican en la documentación que deben ser revisados.

In [0]:
fecha_radica_null = tran_inmobiliarias.withColumn(
    'alerta_formato_fecha_apertura_nulo',
    F.when(F.col("FECHA_APERTURA_TEXTO").isNull(), 1).otherwise(0)
)

display(
    fecha_radica_null.groupBy("alerta_formato_fecha_apertura_nulo").count()
)

#### YEAR RADICA
Regla 1 - calidad: YEAR_RADICA debe coincidir con el año de FECHA_RADICACION


In [0]:
tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "alerta_calidad_year_radica_coincide",
    F.when(
        F.col("YEAR_RADICA") > F.year(F.col("FECHA_RADICA_STD")),
        F.lit(1)
    ).otherwise(F.lit(0))
)
display(
  tran_inmobiliarias.groupBy("alerta_calidad_year_radica_coincide").count()
)

In [0]:
matriculas_inconsistentes_detalle = tran_inmobiliarias.filter(
    F.col("alerta_calidad_year_radica_coincide") == 0
)

display(matriculas_inconsistentes_detalle)

In [0]:
matriculas_inconsistentes_detalle = tran_inmobiliarias.filter(
    F.col("MATRICULA") == '051-260632'
)

display(matriculas_inconsistentes_detalle)

#### Regla 2 - calidad: Year radica nulo.

In [0]:
fecha_radica_null = tran_inmobiliarias.withColumn(
    'alerta_calidad_year_radica_nulo',
    F.when(F.col("YEAR_RADICA").isNull(), 1).otherwise(0)
)

display(
    fecha_radica_null.groupBy("alerta_calidad_year_radica_nulo").count()
)

#### Regla 3 - calidad: YEAR_RADICA no puede ser una fecha futura.

In [0]:
CURRENT_YEAR_COL = F.year(F.current_date())

tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "alerta_calidad_year_futuro",
    F.when(
        F.col("YEAR_RADICA") > CURRENT_YEAR_COL,
        F.lit(1)
    ).otherwise(F.lit(0))
)

display(
    tran_inmobiliarias.groupBy("alerta_calidad_year_futuro").count()
)

## ORIP


In [0]:
%sql
SELECT * FROM `workspace`.`default`.`directorio_orip`;

In [0]:
codigos_orip = _sqldf
display(codigos_orip)

##### Regla 1 - formato: ORIP con caracteres invalidos.

0: Sin inconsistencia - 1: Con inconsistencia

In [0]:

tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "ORIP_LIMPIO_TEMP",
    F.regexp_replace(F.trim(F.col("ORIP")), r"[^0-9a-zA-Z\s]", "")
)

tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "alerta_formato_caracteres_orip",
    F.when(
        (F.length(F.trim(F.col("ORIP"))) != F.length(F.col("ORIP_LIMPIO_TEMP"))),
        1 
    ).otherwise(0)
)

tran_inmobiliarias = tran_inmobiliarias.drop("ORIP_LIMPIO_TEMP")

tran_inmobiliarias.groupBy("alerta_formato_caracteres_orip").count().show()

Regla 2 - calidad - ORIP debe ser válido (existe en el catálogo oficial)

0: Sin inconsistencia - 1: Con inconsistencia

In [0]:
# Estandariza ORIP para aplicar la validación de la regla.
# 1. Patrón para detectar ALFANUMÉRICOS de 3 caracteres con formato 'XXL'
regex_2_num_1_letter = r'^\d{2}[a-zA-Z]$'

# 2. El DataFrame se sobrescribe con la nueva columna ORIP_STD
tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "ORIP_STD",
    F.when(
        # Condición Específica (Alfabeto-Numérica): Si el código limpio coincide con el patrón 'XXL'
        F.trim(F.col("ORIP")).rlike(regex_2_num_1_letter),
        
        # Resultado SI coincide con 'XXL': 
        # a) Rellenar a 4 caracteres (Ej: '50C' -> '050C')
        # b) Insertar un espacio antes de la última letra (Ej: '050C' -> '050 C')
        F.regexp_replace(
            F.lpad(F.trim(F.col("ORIP")), 4, "0"), # Paso a: Rellena a '050C'
            r'(\d)([a-zA-Z])$',                     # Patrón: Busca un dígito seguido de una letra al final
            r'$1 $2'                                # Reemplazo: Dígito, Espacio, Letra (Ej: '050', ' ', 'C')
        )
    ).otherwise(
        # Otherwise (Caso General/Numérico): Aplicar la estandarización por defecto a 3 caracteres.
        F.lpad(F.trim(F.col("ORIP")), 3, "0")
    )
)

In [0]:
lista_orips_validos = [row['CÓDIGO ORIP'] for row in codigos_orip.select("CÓDIGO ORIP").distinct().collect()]

orips_faltantes = ['001', '375']
lista_orips_validos = list(set(lista_orips_validos) | set(orips_faltantes))

In [0]:
tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "alerta_calidad_orip_invalido",
    F.when(
        F.col("ORIP_STD").isin(lista_orips_validos),
        0
    ).otherwise(1)
)
tran_inmobiliarias.groupBy("alerta_calidad_orip_invalido").count().show()

In [0]:

matriculas_orip_inválido = tran_inmobiliarias.filter(
    F.col("cumple_orip_invalido") == 1
)

orips_invalidos_unicos = matriculas_orip_inválido.select("ORIP").distinct()
print("Códigos ORIP Inválidos que no están en el catálogo oficial:")
orips_invalidos_unicos.show(truncate=False)


#### Regla 2 - calidad: Todo acto registral debe ocurrir en una oficina específica = no existe trámite sin ORIP = No nulos.


In [0]:
tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "alerta_calidad_orip_completo",
    F.when(
        (F.col("ORIP").isNotNull()) & (F.trim(F.col("ORIP")) != ""), 
        0
    ).otherwise(1)
)

print("Resumen de Completitud: Columna ORIP")
tran_inmobiliarias.groupBy("alerta_calidad_orip_completo").count().show()

#### Regla 3 - Formato: Formato de 3 digitos

En la MATRICULA se menciona que el codigo ORIP se compone de 3 digitos, pero en la columna ORIP se maneja con varias longitudes (tres dígitos vs. un dígito). Esto rompe la normalización de la base de datos.


In [0]:
tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "alerta_formato_orip_longitud",
    F.when(
        F.length(F.trim(F.col("ORIP"))) != 3, 
        F.lit(1)
    ).otherwise(F.lit(0))
)

display(
    tran_inmobiliarias.groupBy("alerta_formato_orip_longitud").count()
)


## DIVIPOLA

In [0]:
%sql
SELECT * FROM `workspace`.`default`.`divipola_codigos_municipios`;

In [0]:
divipola_codigos = _sqldf 

In [0]:
display(divipola_codigos)

In [0]:
tran_inmobiliarias = tran_inmobiliarias_original

##### Regla 1 - calidad: Validar que el código exista.

0: Sin inconsistencia - 1: Con inconsistencia

In [0]:
divipola_codigos = (
    divipola_codigos
    .withColumn(
        "DIVIPOLA_oficial",
        F.col("Código Municipio")
    )
)

divipola_referencia = divipola_codigos.select(
    "DIVIPOLA_oficial",
    F.col("Nombre Departamento").alias("divipola_nombre_departamento"),
    F.col("Nombre Municipio").alias("divipola_nombre_municipio")
)

tran_inmobiliarias = tran_inmobiliarias.join(
    divipola_referencia,
    tran_inmobiliarias["DIVIPOLA"] == divipola_referencia["DIVIPOLA_oficial"],
    "left"
)

#  Valida DAVIPOLA Es Válido si el JOIN Tuvo Éxito
tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "alerta_calidad_divipola_invalido",
    F.when(
        F.col("divipola_nombre_municipio").isNull(),
        F.lit(1)
    ).otherwise(F.lit(0))
)

display(
    tran_inmobiliarias
    .groupBy("alerta_calidad_divipola_invalido")
    .count()
)



Regla 2 formato - Valida que el departamento este bien formateado.

In [0]:
# Estandarizo para que la comparación sea del nombre, tildes y todo en mayusculas, ya que hay valores con , pero no es error de formato, si no estilo de dataset, porque no hay estandar para eso.
divipola_dpto_limpio = F.regexp_replace(F.col("divipola_nombre_departamento"), ",", "")


tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "alerta_formato_nombre_dpto",
    F.when(
        (F.col("alerta_divipola_invalido") == 0) &
        (F.upper(F.trim(F.col("DEPARTAMENTO"))) != F.upper(F.trim(divipola_dpto_limpio))),
        F.lit(1)
    ).otherwise(F.lit(0))
)


display(
    tran_inmobiliarias
    .groupBy("alerta_formato_nombre_dpto")
    .count()
)

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-5300779340459001>, line 17[0m
[1;32m      2[0m divipola_dpto_limpio [38;5;241m=[39m F[38;5;241m.[39mregexp_replace(F[38;5;241m.[39mcol([38;5;124m"[39m[38;5;124mdivipola_nombre_departamento[39m[38;5;124m"[39m), [38;5;124m"[39m[38;5;124m,[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124m"[39m)
[1;32m      5[0m tran_inmobiliarias [38;5;241m=[39m tran_inmobiliarias[38;5;241m.[39mwithColumn(
[1;32m      6[0m     [38;5;124m"[39m[38;5;124malerta_formato_nombre_dpto[39m[38;5;124m"[39m,
[1;32m      7[0m     F[38;5;241m.[39mwhen(
[0;32m   (...)[0m
[1;32m     11[0m     )[38;5;241m.[39motherwise(F[38;5;241m.[39mlit([38;5;241m0[39m))
[1;32m     12[0m )
[1;32m     15[0m display(
[1;32m     16[0m     tran_inmobiliarias
[0;32m---> 17[0m     [38;5;241m.

Regla 3 - formato: Valida que el Municipio este bien formateado.

In [0]:
# Estandarizo para que la comparación sea del nombre, tildes y todo en mayusculas.
divipola_mpio_limpio = F.regexp_replace(F.col("divipola_nombre_municipio"), ",", "")

tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "alerta_formato_nombre_mpio",
    F.when(
        (F.col("alerta_divipola_invalido") == 0) &
        (F.trim(F.col("MUNICIPIO")) != F.trim(divipola_mpio_limpio)),
        F.lit(1)
    ).otherwise(F.lit(0))
)

display(
    tran_inmobiliarias
    .groupBy("alerta_formato_nombre_mpio")
    .count()
)


In [0]:
municipios_con_alerta = tran_inmobiliarias.filter(
    (F.col("alerta_formato_nombre_mpio") == 1)
)

municipios_unicos_con_alerta = municipios_con_alerta.select(
    F.col("MUNICIPIO"),
    F.col("divipola_nombre_municipio"),
    F.col("alerta_nombre_mpio_incoherente")
)

municipios_unicos_con_alerta = municipios_unicos_con_alerta.distinct()

# El formato indica todo en mayuscula, con tildes e incluir guíones de ser necesario.
display(municipios_unicos_con_alerta)

## DEPARTAMENTO  y MUNICIPIO
Regla 1 - No nulo
Regla 2 - Debe tener coherencia: MATRICULA  ↔  DIVIPOLA  ↔  DEPARTAMENTO ↔ MUNICIPIO
Regla 3 - Debe estar en formato correcto (Col con un mismo formato y tiles.)

Comprobada en la columna 'alerta_formato_nombre_dpto' y 'alerta_formato_nombre_mpi'

## TIPO_PREDIO_ZONA
Regla 1 - calidad: No nulo

In [0]:
from pyspark.sql import functions as F


tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "alerta_calidad_tipo_predio_nulo",
    F.when((F.col("TIPO_PREDIO_ZONA").isNotNull()) & (F.trim(F.col("TIPO_PREDIO_ZONA")) != ""), 0).otherwise(1)
)

tran_inmobiliarias.groupBy("alerta_calidad_tipo_predio_nulo").count().show()



#### Regla 2 - formato: Estandarizar textos 

En la documentación se menciona que la columna debe tener los siguientes valores:

URBANO, RURAL, SIN INFRORMACIÓN.


In [0]:
tipos_predio_zona = ['URBANO', 'RURAL', 'SIN INFORMACIÓN']


tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "alerta_formato_tipo_predio_zona",
    F.when(
        F.col("TIPO_PREDIO_ZONA").isin(tipos_predio_zona),
        F.lit(0)
    ).otherwise(F.lit(1))
)

tran_inmobiliarias.groupBy("alerta_formato_tipo_predio_zona").count().show()


In [0]:
valores_unicos = tran_inmobiliarias.groupBy("TIPO_PREDIO_ZONA").count().orderBy(F.desc("count"))

valores_unicos.show()

# CATEGORIA_RURALIDAD
Regla 1 - calidad: Debe existir para todo municipio

In [0]:
conteo_ruralidad = tran_inmobiliarias.groupBy("MUNICIPIO").agg(
    F.countDistinct(F.col("CATEGORIA_RURALIDAD")).alias("CANTIDAD_RURALIDAD")
)

tran_inmobiliarias = tran_inmobiliarias.join(
    conteo_ruralidad,
    on="MUNICIPIO",
    how="left"
)

tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "alerta_calidad_ruralidad_multiple",
    F.when(
        F.col("CANTIDAD_RURALIDAD") > 1,
        F.lit(1)
    ).otherwise(F.lit(0))
)

tran_inmobiliarias = tran_inmobiliarias.drop("CANTIDAD_RURALIDAD")

tran_inmobiliarias.groupBy("alerta_calidad_ruralidad_multiple").count().show()


In [0]:
matriculas_raras = tran_inmobiliarias.filter(
    F.col("alerta_calidad_ruralidad_multiple") == 1
).select("MUNICIPIO",).distinct()

matriculas_raras.show(truncate=False)



In [0]:
matriculas_raras = tran_inmobiliarias.filter(
    F.col("MUNICIPIO") == "RIONEGRO"
).select("CATEGORIA_RURALIDAD").distinct().show()



## NUM_ANOTACION

Regla 1 calidad: Unicidad por matrícula
(MATRÍCULA, NUM_ANOTACION) debe ser una pareja única.


In [0]:
from pyspark.sql.window import Window

w = Window.partitionBy("MATRICULA", "NUM_ANOTACION")

tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "alerta_calidad_anotacion_duplicada",
    F.when(F.count("*").over(w) > 2, 1).otherwise(0)
)
tran_inmobiliarias.groupBy("alerta_calidad_anotacion_duplicada").count().show()


In [0]:
matriculas_raras = tran_inmobiliarias.filter(
    F.col("alerta_calidad_anotacion_duplicada") == 1
).select("MATRICULA", "NUM_ANOTACION").distinct()

matriculas_raras.show(truncate=False)

In [0]:
matriculas_raras = tran_inmobiliarias.filter(
    F.col("MATRICULA") == '070-127065'
).select("MATRICULA", "NUM_ANOTACION")
display(matriculas_raras)

#### Regla 2 - calidad: Debe ser un número entero positivo.


In [0]:
anotacion_limpia = F.regexp_replace(F.col("NUM_ANOTACION").cast("string"), ",", "")

tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "NUM_ANOTACION_NUMERICO",
    anotacion_limpia.cast("int")
)

tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "alerta_calidad_rango_anotacion",
    F.when(
        F.col("NUM_ANOTACION_NUMERICO").isNull() |
        (F.col("NUM_ANOTACION_NUMERICO") <= 0),
        
        F.lit(1)
    ).otherwise(F.lit(0))
)

tran_inmobiliarias = tran_inmobiliarias.drop("NUM_ANOTACION_NUMERICO")
tran_inmobiliarias.groupBy("alerta_calidad_rango_anotacion").count().show()


#### Regla 3 - calidad: Anotaciones nulas:

In [0]:
tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "alerta_calidad_anotacion_nula",
    F.when((F.col("NUM_ANOTACION").isNotNull()) & (F.trim(F.col("NUM_ANOTACION")) != ""), 0).otherwise(1)
)

tran_inmobiliarias.groupBy("alerta_calidad_anotacion_nula").count().show()



#### Regla 4 - calidad: La secuencia debe ser creciente

No puede haber un salto hacia atrás, debe ser consecutiva, no debe haber duplicados

In [0]:
# Estandariza fecha correctamente para empezar a validar las reglas

# 1. Columna auxiliar para el formato ISO YYYY-MM-DD
tran_inmobiliarias = tran_inmobiliarias.withColumn(
    'FECHA_RADICA_TEMP_ISO',
    F.when(
        F.col('FECHA_RADICA_TEXTO').rlike('^\\d{4}-\\d{2}-\\d{2}'),
        F.to_date(F.substring(F.col('FECHA_RADICA_TEXTO'), 1, 10), 'yyyy-MM-dd')
    ).otherwise(F.lit(None).cast('date')) # Devuelve NULL si no coincide
)

# 2. Columna auxiliar para el formato DD/MM/YYYY (4 dígitos de año)
# Usamos F.regexp_extract para extraer la fecha e ignorar cualquier texto o número inicial.
tran_inmobiliarias = tran_inmobiliarias.withColumn(
    'FECHA_RADICA_TEMP_DMY4',
    F.when(
        # Verifica que el patrón DD/MM/YYYY exista en algún lugar de la cadena
        F.col('FECHA_RADICA_TEXTO').rlike('\\d{2}/\\d{2}/\\d{4}'),
        # Extrae solo el grupo de la fecha y lo convierte
        F.to_date(
            F.regexp_extract(F.col('FECHA_RADICA_TEXTO'), '(\\d{2}/\\d{2}/\\d{4})', 1),
            'dd/MM/yyyy'
        )
    ).otherwise(F.lit(None).cast('date')) # Devuelve NULL si no coincide
)

# 3. Columna auxiliar para el formato DD/MM/YY (2 dígitos de año)
# Usamos F.regexp_extract para extraer la fecha e ignorar cualquier texto o número inicial,
# solucionando casos como '25841 25/08/22'.
tran_inmobiliarias = tran_inmobiliarias.withColumn(
    'FECHA_RADICA_TEMP_DMY2',
    F.when(
        # Verifica que el patrón DD/MM/YY exista en algún lugar de la cadena
        F.col('FECHA_RADICA_TEXTO').rlike('\\d{2}/\\d{2}/\\d{2}'),
        # Extrae solo el grupo de la fecha y lo convierte
        F.to_date(
            F.regexp_extract(F.col('FECHA_RADICA_TEXTO'), '(\\d{2}/\\d{2}/\\d{2})', 1),
            'dd/MM/yy'
        )
    ).otherwise(F.lit(None).cast('date')) # Devuelve NULL si no coincide
)

# 4. Consolidar las fechas: Usamos F.coalesce para tomar el primer valor de fecha no nulo
# y SOBRESCRIBIMOS la columna original 'FECHA_RADICA_TEXTO'.
tran_inmobiliarias = tran_inmobiliarias.withColumn(
    'FECHA_RADICA_STD', # Reemplaza la columna original (ahora de tipo Date)
    F.coalesce(
        F.col('FECHA_RADICA_TEMP_ISO'),
        F.col('FECHA_RADICA_TEMP_DMY4'),
        F.col('FECHA_RADICA_TEMP_DMY2')
    )
)

# 5. Eliminar las columnas auxiliares temporales.
tran_inmobiliarias = tran_inmobiliarias.drop(
    'FECHA_RADICA_TEMP_ISO',
    'FECHA_RADICA_TEMP_DMY4',
    'FECHA_RADICA_TEMP_DMY2'
)


In [0]:
from pyspark.sql.window import Window

FORMATO_FECHA = 'yyyy-MM-dd HH:mm:ss'

w = Window.partitionBy("MATRICULA").orderBy(
    F.to_timestamp(F.col('FECHA_RADICA_STD'), FORMATO_FECHA).asc(), 
    F.regexp_replace(F.col('NUM_ANOTACION'), ",", "").cast("int").asc()
)

NUM_ANOTACION_NUM = F.regexp_replace(F.col("NUM_ANOTACION"), ",", "").cast("int")

tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "NUM_ANOTACION_PREV",
    F.lag(NUM_ANOTACION_NUM, 1).over(w)
)

tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "alerta_calidad_gap_secuencia",
    F.when(
        F.col("NUM_ANOTACION_PREV").isNotNull() & 
        (NUM_ANOTACION_NUM != (F.col("NUM_ANOTACION_PREV") + 1)),
        F.lit(1)
    ).otherwise(F.lit(0))
)

tran_inmobiliarias = tran_inmobiliarias.drop("NUM_ANOTACION_PREV")

tran_inmobiliarias.groupBy("alerta_calidad_gap_secuencia").count().show()

In [0]:
matriculas_raras = tran_inmobiliarias.filter(
    F.col("alerta_calidad_gap_secuencia") == 1
).select("MATRICULA","NUM_ANOTACION").distinct()

matriculas_raras.show(truncate=False)



In [0]:
display(tran_inmobiliarias.filter(
    F.col("MATRICULA") == '001-1001011'
))

In [0]:
display(tran_inmobiliarias.filter(
    F.col("MATRICULA") == '001-1004862'
))

## ESTADO_FOLIO

Regla 1 - calidad: Valores nulos

En la documentación mencionan que los estados del folio son activo: significa que el folio puede seguir recibiendo anotaciones, cerrado: implica que el folio ya no puede continuar recibiendo anotaciones o en custodia: se asigna a otra jurisdicción por razones de seguridad.

No se menciona nada de nulos.

In [0]:
tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "alerta_calidad_estado_folio_nulo",
    F.when((F.col("ESTADO_FOLIO").isNotNull()) & (F.trim(F.col("ESTADO_FOLIO")) != ""), 0).otherwise(1)
)

tran_inmobiliarias.groupBy("alerta_calidad_estado_folio_nulo").count().show()

In [0]:
tran_inmobiliarias.groupBy("ESTADO_FOLIO")\
    .count()\
    .orderBy(F.desc("count"))\
    .show(truncate=False)
    

In [0]:
display(tran_inmobiliarias.filter(
    F.trim(F.col("ESTADO_FOLIO")) == 'CERRADO'
))

In [0]:
display(tran_inmobiliarias.filter(
    F.trim(F.col("MATRICULA")) == '002-12850'
))

## Dinámica_Inmobiliaria
Regla 1 - calidad: No debe haber nulos


In [0]:
from pyspark.sql import functions as F


tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "alerta_cclidad_dinamica_nulos",
    F.when((F.col("Dinámica_Inmobiliaria").isNotNull()) & (F.trim(F.col("Dinámica_Inmobiliaria")) != ""), 0).otherwise(1)
)

tran_inmobiliarias.groupBy("alerta_cclidad_dinamica_nulos").count().show()



#### Regla 2 - calidad: Valores que no sean binarios

In [0]:
cond = (
    F.col("Dinámica_Inmobiliaria").isNotNull() &
    (F.trim(F.col("Dinámica_Inmobiliaria")) != "") &
    F.col("Dinámica_Inmobiliaria").isin("1", "0")
)

tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "alerta_calidad_dinamica_diferente_1_0",
    F.when(cond, 0)
    .otherwise(1)

)

display(tran_inmobiliarias.groupBy("alerta_calidad_dinamica_diferente_1_0").count())

## COD_NATUJUR
Regla 1 calidad - Nulos

In [0]:
tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "alerta_calidad_cod_natujur_nulos",
    F.when((F.col("COD_NATUJUR").isNotNull()) & (F.trim(F.col("COD_NATUJUR")) != ""), 0).otherwise(1)
)

tran_inmobiliarias.groupBy("alerta_calidad_cod_natujur_nulos").count().show()



Regla 2 calidad - Códigos incorrectos o inexistentes según la resolución de la SNR.


In [0]:
%sql
SELECT * FROM `workspace`.`default`.`codigos_srn`;

In [0]:
from pyspark.sql import functions as F
codigos_srn = _sqldf

In [0]:
lista_srn_validos = [row['codigo'] for row in codigos_srn.select("codigo").distinct().collect()]


In [0]:
tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "alerta_calidad_srn_invalido",
    F.when(
        F.col("COD_NATUJUR").isin(lista_srn_validos),
        0
    ).otherwise(1)
)

tran_inmobiliarias.groupBy("alerta_calidad_srn_invalido").count().show()

In [0]:
matriculas_raras = tran_inmobiliarias.filter(
    F.col("cumple_srn_invalido") == 1
).select("MATRICULA","COD_NATUJUR").distinct()

matriculas_raras.show(truncate=False)



## COUNT_A
Regla 1 - calidad: Valores nulos o vacíos.

In [0]:
tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "alerta_calidad_count_a_nulo",
    F.when((F.col("COUNT_A").isNotNull()) & (F.trim(F.col("COUNT_A")) != ""), 0).otherwise(1)
)

tran_inmobiliarias.groupBy("alerta_calidad_count_a_nulo").count().show()

Regla 2 - calidad: Valores en 0

In [0]:
tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "alerta_calidad_count_a_en_cero",
    F.when(F.trim(F.col("COUNT_A")) == "0", 1).otherwise(0)
)

tran_inmobiliarias.groupBy("alerta_calidad_count_a_en_cero").count().show()

Regla 3 - formato: Valores mal formateados
Valores negativos o no enteros, Valores no numéricos ('dos', '?', etc.).


In [0]:
tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "alerta_formto_count_a",
    F.when(
        # No numéricos
        ~F.col("COUNT_A").rlike("^[0-9]+$"),
        1
    ).otherwise(0)
)

tran_inmobiliarias.groupBy("alerta_formto_count_a").count().show()


## COUNT_DE
Regla 1 - calidad: Valores nulos o vacíos.

In [0]:
tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "alerta_calidad_count_de_nulo",
    F.when((F.col("COUNT_DE").isNotNull()) & (F.trim(F.col("COUNT_DE")) != ""), 0).otherwise(1)
)

tran_inmobiliarias.groupBy("alerta_calidad_count_de_nulo").count().show()



Regla 2 - calidad: Valores en 0

In [0]:
tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "alerta_calidad_count_de_en_cero",
    F.when(F.trim(F.col("COUNT_DE")) == "0", 1).otherwise(0)
)

tran_inmobiliarias.groupBy("alerta_calidad_count_de_en_cero").count().show()


Regla 3 - formato: Valores mal formateados
Valores negativos o no enteros, Valores no numéricos ('dos', '?', etc.).

In [0]:
tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "alerta_formato_count_de",
    F.when(
        # No numéricos
        ~F.col("COUNT_DE").rlike("^[0-9]+$"),
        1
    ).otherwise(0)
)

tran_inmobiliarias.groupBy("alerta_formato_count_de").count().show()


In [0]:
display(tran_inmobiliarias.filter(col("alerta_formato_count_de") == '1'))

In [0]:
display(tran_inmobiliarias.filter(col("MATRICULA") == '240-86750'))

## PREDIOS_NUEVOS
Regla 1 - calidad: Valores nulos o vacíos.

In [0]:
tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "alerta_calidad_predios_nuevos_nulo",
    F.when((F.col("PREDIOS_NUEVOS").isNotNull()) & (F.trim(F.col("PREDIOS_NUEVOS")) != ""), 0).otherwise(1)
)

tran_inmobiliarias.groupBy("alerta_calidad_predios_nuevos_nulo").count().show()



Regla 2 - calidad: Valores distintos de 0 o 1 (errores de captura, 'sí', 'no', 2).\

In [0]:
tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "PREDIOS_NUEVOS_NUM",
    F.col("PREDIOS_NUEVOS").cast("int")
)

tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "alerta_calidad_predios_nuevos_binario",
    F.when(
        F.col("PREDIOS_NUEVOS_NUM").isNull() |
        (F.col("PREDIOS_NUEVOS_NUM") != 0) & (F.col("PREDIOS_NUEVOS_NUM") != 1),
        F.lit(1)
    ).otherwise(F.lit(0))
)

tran_inmobiliarias = tran_inmobiliarias.drop("PREDIOS_NUEVOS_NUM")
tran_inmobiliarias.groupBy("alerta_calidad_predios_nuevos_binario").count().show()


Regla 3 - calidad: Confirmar que el primer registro de un folio sea el más antiguo.


In [0]:
from pyspark.sql.window import Window

NUM_ANOTACION_NUM = F.regexp_replace(F.col("NUM_ANOTACION"), ",", "").cast("int")
FECHA_RADICA_TS = F.to_timestamp(F.col('FECHA_RADICA_STD'), 'yyyy-MM-dd HH:mm:ss')

w = Window.partitionBy("MATRICULA")

tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "MIN_FECHA_RADICA",
    F.min(FECHA_RADICA_TS).over(w)
).withColumn(
    "MIN_NUM_ANOTACION",
    F.min(NUM_ANOTACION_NUM).over(w)
)

tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "FECHA_PRIMER_ANOTACION",
    F.max(
        F.when(
            NUM_ANOTACION_NUM == F.col("MIN_NUM_ANOTACION"),
            FECHA_RADICA_TS
        )
    ).over(w)
)

tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "alerta_calidad_cronologia_folio",
    F.when(
        F.col("MIN_FECHA_RADICA") != F.col("FECHA_PRIMER_ANOTACION"),
        F.lit(1)
    ).otherwise(F.lit(0))
)

tran_inmobiliarias = tran_inmobiliarias.drop(
    "MIN_FECHA_RADICA", "MIN_NUM_ANOTACION", "FECHA_PRIMER_ANOTACION"
)

tran_inmobiliarias.groupBy("alerta_calidad_cronologia_folio").count().show()

## TIENE_VALOR
Regla 1 - calidad: Comprobar valores 1 y 0

In [0]:
tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "TIENE_VALOR_NUM",
    F.col("TIENE_VALOR").cast("int")
)

tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "alerta_calidad_valor_binario",
    F.when(
        F.col("TIENE_VALOR_NUM").isNull() |
        ((F.col("TIENE_VALOR_NUM") != 0) & (F.col("TIENE_VALOR_NUM") != 1)),
        
        F.lit(1)
    ).otherwise(F.lit(0))
)

tran_inmobiliarias = tran_inmobiliarias.drop("TIENE_VALOR_NUM")
tran_inmobiliarias.groupBy("alerta_calidad_valor_binario").count().show()


#### Regla 2 - calidad: Inconsistencias con columnas de valor real (VALOR)
Marcar registros donde TIENE_VALOR = 1 pero VALOR está vacío o nulo

In [0]:
tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "alerta_calidad_tiene_valor_faltante",
    F.when(
        (F.col("TIENE_VALOR") == 1) &
        (F.col("VALOR").isNull() | (F.trim(F.col("VALOR")) == "")),
        F.lit(1)
    ).otherwise(F.lit(0))
)

tran_inmobiliarias.groupBy("alerta_calidad_tiene_valor_faltante").count().show()


## TIENE_MAS_DE_UN_VALOR
Regla 1 - calidad: Comprobar valores 1 y 0


In [0]:
tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "TIENE_MAS_DE_UN_VALOR_NUM",
    F.col("TIENE_MAS_DE_UN_VALOR").cast("int")
)

tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "alerta_calidad_tiene_mas_de_valor_binario",
    F.when(
        F.col("TIENE_MAS_DE_UN_VALOR_NUM").isNull() |
        ((F.col("TIENE_MAS_DE_UN_VALOR_NUM") != 0) & (F.col("TIENE_MAS_DE_UN_VALOR_NUM") != 1)),
        F.lit(1)
    ).otherwise(F.lit(0))
)

tran_inmobiliarias = tran_inmobiliarias.drop("TIENE_MAS_DE_UN_VALOR_NUM")

tran_inmobiliarias.groupBy("alerta_calidad_tiene_mas_de_valor_binario").count().show()


#### Regla 2 - calidad: Inconsistencias con columnas de valor real (VALOR)
Marcar registros donde TIENE_VALOR = 1 pero VALOR está vacío o nulo

In [0]:
tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "alerta_calidad_mas_de_un_valor_faltante",
    F.when(
        (F.col("TIENE_MAS_DE_UN_VALOR") == 1) &
        (F.col("VALOR").isNull() | (F.trim(F.col("VALOR")) == "")),
        F.lit(1)
    ).otherwise(F.lit(0))
)

tran_inmobiliarias.groupBy("alerta_calidad_mas_de_un_valor_faltante").count().show()


## VALOR
#### Regla 1 - formato: Formatos inconsistentes (ej: con puntos, comas, símbolos de moneda, nulos, valores no númericos).

In [0]:
tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "VALOR_LIMPIO_TEMP",
    F.regexp_replace(F.trim(F.col("VALOR")), r"[$,.]", "") 
)

tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "VALOR_NUMERICO",
    F.col("VALOR_LIMPIO_TEMP").cast("decimal(20, 0)")
)

tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "alerta_formato_valor",
    F.when(
        (F.col("VALOR").isNotNull()) & (F.col("VALOR_NUMERICO").isNull()),
        F.lit(1)
    ).otherwise(F.lit(0))
)

tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "VALOR",
    F.col("VALOR_NUMERICO")
)

tran_inmobiliarias.groupBy("alerta_formato_valor").count().show()


#### Regla 2 - calidad: Valores en 0

In [0]:
tran_inmobiliarias = tran_inmobiliarias.withColumn(
    "alerta_calidad_valor_cero",
    F.when(
        (F.col("VALOR_NUMERICO") == 0),
        F.lit(1)
    ).otherwise(F.lit(0))
)

tran_inmobiliarias.groupBy("alerta_calidad_valor_cero").count().show()


#### Regla 1 - General: Registros duplicados

In [0]:
pk_columna = "PK"
columnas_a_verificar = [col for col in tran_inmobiliarias.columns if col != pk_columna]

duplicados_conteo = tran_inmobiliarias.groupBy(columnas_a_verificar).count()

registros_duplicados = duplicados_conteo.filter(F.col("count") > 1)

total_registros_duplicados = registros_duplicados.agg(F.sum("count")).collect()[0][0]

print(f"Total de conjuntos de registros duplicados (filas duplicadas): {registros_duplicados.count()}")
print(f"Total de registros individuales involucrados en duplicados (count > 1): {total_registros_duplicados}")
# Mostrar algunos de los registros que están duplicados
display(registros_duplicados.orderBy(F.desc("count")))