# Configuración e importe de paquetes

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions
from pyspark.sql.types import StructType
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql.types import FloatType, StringType, IntegerType, DateType
from pyspark.sql.functions import udf, col, length, isnan, when, count
import pyspark.sql.functions as f
import os 
from datetime import datetime
from pyspark.sql import types as t
#from pandas_profiling import ProfileReport
import matplotlib.pyplot as plt
import numpy as np

Configuración del controlador e inicio de sesion Spark

In [2]:
path_jar_driver = '/home/jovyan/code/java_sqlServer/mssql-jdbc-12.4.2.jre8.jar'

#Configuración de la sesión
conf=SparkConf() \
    .set('spark.driver.extraClassPath', path_jar_driver) \
    .set('spark.driver.memory', '8g')  # Ajusta el tamaño de la memoria del driver según sea necesario


spark_context = SparkContext(conf=conf)
sql_context = SQLContext(spark_context)
spark = sql_context.sparkSession




# Conexión a fuente de datos y acceso a los datos

Detalles de la conexión a la base de datos SQL de Azure

In [28]:
# Detalles de conexión a la base de datos SQL de Azure
url = "jdbc:sqlserver://sqlserver-calidadgobierno.database.windows.net:1433;database=sqldb-calidadgobierno"
usuario = "sqladmin@sqlserver-calidadgobierno"
contraseña = "y.HCuP4gJBb?)MS/6~W!+;"
driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"

Función para convertir un Dataframe en una tabla para mi base de datos 

In [29]:
from pyspark.sql import DataFrame
def cargar_dataframe_a_tabla(dataframe: DataFrame, nombre_tabla: str):
    """
    Método para cargar un DataFrame en una tabla de una base de datos.

    Parámetros:
    - dataframe: DataFrame que se cargará en la tabla.
    - nombre_tabla: Nombre de la tabla en la base de datos.
    """
    # Escribe el DataFrame en la base de datos como una tabla
    dataframe.write.jdbc(url=url, table=nombre_tabla, mode="append", properties={"user": usuario, "password": contraseña, "driver": driver})

Funcion para leer una tabla de la base de datos y convertirla en dataframe

In [30]:
from pyspark.sql import SparkSession

def leer_tabla_a_dataframe(url, tabla, usuario, contraseña, driver):

    # Configurar la conexión a la base de datos
    properties = {
        "user": usuario,
        "password": contraseña,
        "driver": driver
    }

    # Leer la tabla y convertirla en un DataFrame
    df = spark.read.jdbc(url=url, table=tabla, properties=properties)

    return df

# Lecturas tablas parametrizadas

# Lectura Activo de información

In [31]:
# Nombre de la tabla que se desea leer
nombre_tabla = "dbo.activo_informaciones"

# Llamar a la función para leer la tabla y obtener el DataFrame
df_activo = leer_tabla_a_dataframe(url, nombre_tabla, usuario, contraseña, driver)

In [32]:
from pyspark.sql import SparkSession

# Asumiendo que spark es tu SparkSession y df_activo ya está definido
def mostrar_activos(df):
    # Mostrar solo las columnas id y nombre para que el usuario elija
    activos = df.select("id", "nombre")
    activos.show(truncate=False)

# Ejecutar la función para mostrar los activos
#mostrar_activos(df_activo)


In [33]:
def seleccionar_activo(df):
    mostrar_activos(df)
    activo_id = input("Por favor, introduce el ID del activo que deseas seleccionar: ")
    try:
        activo_id = int(activo_id)  # Convertir el ID a entero
    except ValueError:
        print("El ID del activo debe ser un número entero.")
        return None, None  # Retorna None si la conversión falla

    activo_seleccionado = df.filter(df.id == activo_id)
    if activo_seleccionado.count() == 0:
        print("No se encontró un activo con ese ID.")
        return None, None
    
    activo_seleccionado.show(truncate=False)
    nombre_activo = activo_seleccionado.select("nombre").collect()[0][0]
    return activo_id, nombre_activo

# Ejecutar el proceso completo
activo_id, nombre_activo = seleccionar_activo(df_activo)

+---+----------------+
|id |nombre          |
+---+----------------+
|12 |clienteJuridico2|
+---+----------------+

+---+----------------+------------------+----------------+----------------+----------+-----------+------------+-------+--------------+-------------------+----------+--------------------------+--------------------------+
|id |nombre          |tipo_clasificacion|confidencialidad|custodio        |integridad|dependencia|formato     |idioma |frecuencia_act|fecha_creacion     |empresa_id|created_at                |updated_at                |
+---+----------------+------------------+----------------+----------------+----------+-----------+------------+-------+--------------+-------------------+----------+--------------------------+--------------------------+
|12 |clienteJuridico2|Alta              |Alta            |Carlos Gutierrez|Alta      |Analitica  |SQL DATABASE|Español|Mensual       |2024-05-16 05:00:00|6         |2024-05-16 06:30:32.049177|2024-05-16 09:39:34.618614|
+---

In [34]:
# Formatear el nombre de la tabla usando el nombre del activo
nombre_tabla = f"Calidad.{nombre_activo}"

# Llamar a la función para leer la tabla y obtener el DataFrame
df_spark = leer_tabla_a_dataframe(url, nombre_tabla, usuario, contraseña, driver)

# Mostrar el DataFrame resultante
df_spark.show()

# Obtener el número de filas
num_filas = df_spark.count()

# Obtener el número de columnas
num_columnas = len(df_spark.columns)

print(f"El DataFrame tiene {num_filas} filas y {num_columnas} columnas.")

# Mostrar el esquema del DataFrame obtenido
df_spark.printSchema()

+------+------+---------+--------------------+------+--------------------+----------+--------+--------------------+-----------+-----------+----------+-----------------+------------------------+------------------+-------------------+----------+---------------------+----------------------------+----------------------------+--------------------+---------------------+----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+------------------------------------------+------------------+-----------------+----+--------+---------------+-------+--------------------+--------------------+----------------+----------+--------------------+-------------------------------+-------------+---------------------+--------------------+----------------+--------------+----------+--------+---------+-------+---------+----------------------+--------------------+-------------+----------+--------------------+--------------------+-------------------+-------------+----------------

## Lectura Campo Activo

In [35]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, current_date, lit, to_date

# Nombre de la tabla que se desea leer
nombre_tabla = "dbo.campo_activos"

# Llamar a la función para leer la tabla y obtener el DataFrame
df_campoActivo = leer_tabla_a_dataframe(url, nombre_tabla, usuario, contraseña, driver)

fecha_especifica = to_date(lit("2024-05-14"), "yyyy-MM-dd")

# Filtrar el DataFrame para obtener registros de hoy o fechas posteriores
df_fil_campoActivo = df_campoActivo.filter(col("created_at") >= fecha_especifica)

df_fil_campoActivo = df_fil_campoActivo.drop("created_at", "updated_at", "tipo", "longitud")

df_fil_campoActivo = df_fil_campoActivo.withColumnRenamed("id", "id_campo")
df_fil_campoActivo = df_fil_campoActivo.withColumnRenamed("nombre", "nombre_campo")
df_fil_campoActivo = df_fil_campoActivo.withColumnRenamed("descripcion", "descripcion_campo")

# Mostrar los resultados filtrados
df_fil_campoActivo.show()

# Obtener el número de filas
num_filas = df_fil_campoActivo.count()

# Obtener el número de columnas
num_columnas = len(df_fil_campoActivo.columns)

print(f"El DataFrame tiene {num_filas} filas y {num_columnas} columnas.")

# Mostrar el esquema del DataFrame obtenido
df_fil_campoActivo.printSchema()

+--------+------------+-----------------+---------------------+
|id_campo|nombre_campo|descripcion_campo|activo_informacion_id|
+--------+------------+-----------------+---------------------+
|     206|      SCORER|             NULL|                   12|
|     207|     TEL_REP|             NULL|                   12|
|     208|    REGIONAL|             NULL|                   12|
+--------+------------+-----------------+---------------------+

El DataFrame tiene 3 filas y 4 columnas.
root
 |-- id_campo: long (nullable = true)
 |-- nombre_campo: string (nullable = true)
 |-- descripcion_campo: string (nullable = true)
 |-- activo_informacion_id: long (nullable = true)



## Lectura Regla 

In [36]:
# Nombre de la tabla que se desea leer
nombre_tabla = "dbo.regla_filtros"

# Llamar a la función para leer la tabla y obtener el DataFrame
df_reglaFiltros = leer_tabla_a_dataframe(url, nombre_tabla, usuario, contraseña, driver)


df_fil_reglaFiltros= df_reglaFiltros.filter(col("created_at") >= fecha_especifica)

df_fil_reglaFiltros = df_fil_reglaFiltros.drop("created_at", "updated_at", "estado")

df_fil_reglaFiltros = df_fil_reglaFiltros.withColumnRenamed("id", "id_regla")
df_fil_reglaFiltros = df_fil_reglaFiltros.withColumnRenamed("nombre", "nombre_regla")
df_fil_reglaFiltros = df_fil_reglaFiltros.withColumnRenamed("descripcion", "descripcion_regla")


# Mostrar los resultados filtrados
df_fil_reglaFiltros.show()

# Obtener el número de filas
num_filas = df_fil_reglaFiltros.count()

# Obtener el número de columnas
num_columnas = len(df_fil_reglaFiltros.columns)

print(f"El DataFrame tiene {num_filas} filas y {num_columnas} columnas.")

# Mostrar el esquema del DataFrame obtenido

df_fil_reglaFiltros.printSchema()

+--------+--------------------+--------------------+-----------+--------------------+----+-------------+
|id_regla|        nombre_regla|   descripcion_regla|  dimension| descripcion_tecnica|tipo|tipo_regla_id|
+--------+--------------------+--------------------+-----------+--------------------+----+-------------+
|      45|   No puede ser cero|Esta regla valida...|    Validez|col(campo).isNull...|   0|         NULL|
|      46|    No puede ser uno|Esta regla valida...|    Validez|col(campo).isNull...|   0|         NULL|
|      47|No puede estar vacio|El campo no puede...|Completitud|col(campo).isNull...|   0|         NULL|
+--------+--------------------+--------------------+-----------+--------------------+----+-------------+

El DataFrame tiene 3 filas y 7 columnas.
root
 |-- id_regla: long (nullable = true)
 |-- nombre_regla: string (nullable = true)
 |-- descripcion_regla: string (nullable = true)
 |-- dimension: string (nullable = true)
 |-- descripcion_tecnica: string (nullable = t

## Lectura Regla Campo Activo

In [37]:
nombre_tabla = "dbo.regla_campo_activos"

# Llamar a la función para leer la tabla y obtener el DataFrame
df_regla_campo = leer_tabla_a_dataframe(url, nombre_tabla, usuario, contraseña, driver)


df_regla_campo= df_regla_campo.filter(col("created_at") >= fecha_especifica)

df_regla_campo = df_regla_campo.drop("created_at", "updated_at", "regla_references", "activo_informacion_id", "nombre")

df_regla_campo = df_regla_campo.withColumnRenamed("id", "id_regla_campo")
df_regla_campo = df_regla_campo.withColumnRenamed("campo_activo_id", "id_campo")
df_regla_campo = df_regla_campo.withColumnRenamed("regla_filtro_id", "id_regla")

df_regla_campo.show()

+--------------+--------+--------+
|id_regla_campo|id_campo|id_regla|
+--------------+--------+--------+
|            54|     206|      45|
|            55|     206|      46|
|            56|     207|      46|
|            57|     208|      47|
+--------------+--------+--------+



# Generación de la dimensión REGLA_CAMPO_ACTIVO

In [38]:
# Realizar el primer join entre df_regla_campo y df_fil_reglaFiltros
df_joined = df_regla_campo.join(df_fil_reglaFiltros, "id_regla", "inner")

# Realizar el segundo join con df_fil_campoActivo
df_final = df_joined.join(df_fil_campoActivo, "id_campo", "inner")

# Seleccionar las columnas deseadas para evitar duplicados
df_final = df_final.select(
    "id_regla_campo",
    "id_campo",
    "id_regla",
    "nombre_regla",
    "descripcion_regla",
    "dimension",
    "descripcion_tecnica",
    "tipo",
    "tipo_regla_id",
    "nombre_campo",
    "descripcion_campo",
    "activo_informacion_id"
)

# Mostrar el DataFrame final para verificar
df_final.show(truncate=False)
df_final.printSchema()


+--------------+--------+--------+--------------------+--------------------------------------------------+-----------+----------------------------------------------------------------------+----+-------------+------------+-----------------+---------------------+
|id_regla_campo|id_campo|id_regla|nombre_regla        |descripcion_regla                                 |dimension  |descripcion_tecnica                                                   |tipo|tipo_regla_id|nombre_campo|descripcion_campo|activo_informacion_id|
+--------------+--------+--------+--------------------+--------------------------------------------------+-----------+----------------------------------------------------------------------+----+-------------+------------+-----------------+---------------------+
|57            |208     |47      |No puede estar vacio|El campo no puede presentar nulos o vacios        |Completitud|col(campo).isNull() |   (col(campo) == '')                            |0   |NULL         |REGION

# Definición de reglas para la calidad de los datos

In [39]:
import pyspark.sql.functions as F
from pyspark.sql.functions import expr, col, collect_list
from pprint import pprint

df_grouped = df_final.groupBy("activo_informacion_id", "nombre_campo", "dimension").agg(
    collect_list("nombre_regla").alias("reglas")
)

# Construir el diccionario de reglas_campos
reglas_campos = {}
for row in df_grouped.collect():
    act_id = row["activo_informacion_id"]
    campo = row["nombre_campo"]
    dimension = row["dimension"]
    reglas = row["reglas"]
    
    if campo not in reglas_campos:
        reglas_campos[campo] = {}
    if dimension not in reglas_campos[campo]:
        reglas_campos[campo][dimension] = []
    
    reglas_campos[campo][dimension].extend(reglas)

# Imprimir el diccionario con pprint
pprint(reglas_campos)

{'REGIONAL': {'Completitud': ['No puede estar vacio']},
 'SCORER': {'Validez': ['No puede ser uno', 'No puede ser cero']},
 'TEL_REP': {'Validez': ['No puede ser uno']}}


# Jerarquia de analisis QA

In [40]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import expr, col, collect_list
from pprint import pprint

# Asegúrate de que df_final tiene todas las columnas necesarias
selected_df = df_final.select("nombre_campo", "nombre_regla", "descripcion_tecnica", "activo_informacion_id", "id_regla", "id_campo", "id_regla_campo")

def generate_expression(row):
    regla_formatted = row.nombre_regla.replace(" ", "_")
    variable_name = f"{row.nombre_campo}_Regla_{regla_formatted}"
    # Construir la expresión directamente como código Python evaluado
    dynamic_expression = f"col('{row.nombre_campo}').isNull() | (col('{row.nombre_campo}') == '') | (col('{row.nombre_campo}') == 1)"
    
    # Función que evalúa la expresión dentro del contexto local adecuado
    def expression_function(df):
        # Local context para eval
        local_dict = {'col': col, 'df': df, 'lit': F.lit}
        return df.filter(eval(dynamic_expression, {}, local_dict))
    
    return (row.activo_informacion_id, variable_name, expression_function, row.id_regla, row.id_campo, row.id_regla_campo)

# Aplica la función y recoge las expresiones
expressions = selected_df.rdd.map(generate_expression).collect()
expressions_dict = {}
for act_id, name, func, id_regla, id_campo, id_regla_campo in expressions:
    if act_id not in expressions_dict:
        expressions_dict[act_id] = {}
    expressions_dict[act_id][name] = (func, id_regla, id_campo, id_regla_campo)

# Asegúrate de que df_grouped tiene la columna correcta
df_grouped = df_final.groupBy("activo_informacion_id", "nombre_campo", "dimension").agg(
    collect_list("nombre_regla").alias("reglas")
)

reglas_campos = {}
for row in df_grouped.collect():
    act_id = row['activo_informacion_id']
    campo = row['nombre_campo']
    dimension = row['dimension']
    reglas = row['reglas']
    
    if act_id not in reglas_campos:
        reglas_campos[act_id] = {}
    if campo not in reglas_campos[act_id]:
        reglas_campos[act_id][campo] = {}
    if dimension not in reglas_campos[act_id][campo]:
        reglas_campos[act_id][campo][dimension] = []
    
    try:
        reglas_aplicadas = [
            (expressions_dict[act_id].get(f"{campo}_Regla_{regla.replace(' ', '_')}"), f"{campo}_Regla_{regla.replace(' ', '_')}") 
            for regla in reglas 
            if f"{campo}_Regla_{regla.replace(' ', '_')}" in expressions_dict[act_id]
        ]
        print(f"Act ID: {act_id}, Campo: {campo}, Dimension: {dimension}, Reglas Aplicadas: {reglas_aplicadas}")
        reglas_campos[act_id][campo][dimension].extend(reglas_aplicadas)
    except Exception as e:
        print(f"Error al procesar el campo '{campo}': {str(e)}")

pprint(reglas_campos)

# Función para generar DataFrame desde un campo y sus reglas
def generar_dataframe_desde_campo(campo, dataframe, reglas_por_campo):
    df_campo = dataframe.select(campo, 'NIT', 'FechaCorte')
    dataframes_concatenados = []

    for dimension, filtros in reglas_por_campo.items():
        for filtro_info, nombre_filtro in filtros:
            filtro, id_regla, id_campo, id_regla_campo = filtro_info
            df_filtrado = filtro(df_campo)
            df_filtrado = df_filtrado.withColumn('Regla', F.lit(nombre_filtro))
            df_filtrado = df_filtrado.withColumnRenamed(campo, 'Registro')
            df_filtrado = df_filtrado.withColumn('Dimension', F.lit(dimension))
            df_filtrado = df_filtrado.withColumn('Campo', F.lit(campo))
            df_filtrado = df_filtrado.withColumn('id_regla', F.lit(id_regla))
            df_filtrado = df_filtrado.withColumn('id_campo', F.lit(id_campo))
            df_filtrado = df_filtrado.withColumn('id_regla_campo', F.lit(id_regla_campo))
            df_filtrado = df_filtrado.withColumn('activo_informacion_id', F.lit(act_id))
            dataframes_concatenados.append(df_filtrado)

    if dataframes_concatenados:
        df_concatenado = dataframes_concatenados[0]
        for df in dataframes_concatenados[1:]:
            df_concatenado = df_concatenado.union(df)
        return df_concatenado
    else:
        return dataframe

Act ID: 12, Campo: TEL_REP, Dimension: Validez, Reglas Aplicadas: [((<function generate_expression.<locals>.expression_function at 0x7f9ad5312ca0>, 46, 207, 56), 'TEL_REP_Regla_No_puede_ser_uno')]
Act ID: 12, Campo: SCORER, Dimension: Validez, Reglas Aplicadas: [((<function generate_expression.<locals>.expression_function at 0x7f9ad5312de0>, 46, 206, 55), 'SCORER_Regla_No_puede_ser_uno'), ((<function generate_expression.<locals>.expression_function at 0x7f9ad5312d40>, 45, 206, 54), 'SCORER_Regla_No_puede_ser_cero')]
Act ID: 12, Campo: REGIONAL, Dimension: Completitud, Reglas Aplicadas: [((<function generate_expression.<locals>.expression_function at 0x7f9ad5311e40>, 47, 208, 57), 'REGIONAL_Regla_No_puede_estar_vacio')]
{12: {'REGIONAL': {'Completitud': [((<function generate_expression.<locals>.expression_function at 0x7f9ad5311e40>,
                                     47,
                                     208,
                                     57),
                              

# Ejecución automatizada de calidad

## Generación de inconformidades Fase 1

In [41]:

if activo_id is not None and activo_id in reglas_campos:
    reglas_por_activo = reglas_campos[activo_id]
    resultado_concatenado = None

    for campo in reglas_por_activo:
        try:
            df_actual = generar_dataframe_desde_campo(campo, df_spark, reglas_por_activo[campo])
            resultado_concatenado = df_actual if resultado_concatenado is None else resultado_concatenado.union(df_actual)
        except Exception as e:
            print(f"Error al procesar el campo '{campo}': {str(e)}")
            continue

    if resultado_concatenado:
        resultado_concatenado.show()
else:
    print(f"No se encontraron reglas para el activo con ID {activo_id}")



+--------+---------+----------+--------------------+---------+-------+--------+--------+--------------+---------------------+
|Registro|      NIT|FechaCorte|               Regla|Dimension|  Campo|id_regla|id_campo|id_regla_campo|activo_informacion_id|
+--------+---------+----------+--------------------+---------+-------+--------+--------+--------------+---------------------+
|    NULL|  1619328|31/12/2023|TEL_REP_Regla_No_...|  Validez|TEL_REP|      46|     207|            56|                   12|
|       1|900680874|29/02/2024|TEL_REP_Regla_No_...|  Validez|TEL_REP|      46|     207|            56|                   12|
|    NULL|193198654|31/12/2023|TEL_REP_Regla_No_...|  Validez|TEL_REP|      46|     207|            56|                   12|
|    NULL|900229681|31/01/2024|TEL_REP_Regla_No_...|  Validez|TEL_REP|      46|     207|            56|                   12|
|    NULL|860044007|29/02/2024|TEL_REP_Regla_No_...|  Validez|TEL_REP|      46|     207|            56|               

In [42]:
""""
from pyspark.sql.functions import current_timestamp, lit
from pyspark.sql import Window
from pyspark.sql.functions import row_number

# Agregar la columna "ID_analisisQA" con valor constante "1" al DataFrame "resultado_concatenado"
RegistroNoConforme = resultado_concatenado.withColumn("ID_analisisQA", lit(1))

# Crear una ventana de partición para ordenar los registros y generar un ID único
windowSpec = Window.orderBy(lit(1))
RegistroNoConforme = RegistroNoConforme.withColumn("ID_Registro", row_number().over(windowSpec))

# Agregar la columna "fecha_analisis" con la fecha y hora actual
RegistroNoConforme = RegistroNoConforme.withColumn("fecha_analisis", current_timestamp())
RegistroNoConforme = RegistroNoConforme.withColumn("fecha_remediacion", lit(None).cast("timestamp"))
RegistroNoConforme = RegistroNoConforme.withColumn("estado", lit(2))
RegistroNoConforme = RegistroNoConforme.withColumn("plan_de_mejora_id", lit(None).cast("long"))
RegistroNoConforme = RegistroNoConforme.withColumn("created_at", current_timestamp())
RegistroNoConforme = RegistroNoConforme.withColumn("updated_at", current_timestamp())


# Reordenar las columnas según lo solicitado
df_reordenado = RegistroNoConforme.select('ID_Registro', 'id_regla', 'NIT', 'Registro', 'Dimension', 'ID_analisisQA', 'fecha_analisis', 'id_campo', 'activo_informacion_id', 'fecha_remediacion', 'estado', 'plan_de_mejora_id', 'created_at', 'updated_at')

df_reordenado.show()
""""

SyntaxError: unterminated string literal (detected at line 26) (2286185397.py, line 26)

In [None]:
""""
df_reg_inconforme= df_reordenado.withColumnRenamed("ID_Registro", "id")
df_reg_inconforme= df_reg_inconforme.withColumnRenamed("id_regla", "regla_filtro_id")
df_reg_inconforme= df_reg_inconforme.withColumnRenamed("NIT", "llave")
df_reg_inconforme = df_reg_inconforme.withColumnRenamed("Registro", "nombre")
df_reg_inconforme = df_reg_inconforme.withColumnRenamed("Dimension", "dimension")
df_reg_inconforme = df_reg_inconforme.withColumnRenamed("ID_analisisQA", "analisis_qa_id")
df_reg_inconforme = df_reg_inconforme.withColumnRenamed("fecha_analisis", "fecha_analisis")
df_reg_inconforme = df_reg_inconforme.withColumnRenamed("id_campo", "campo_activo_id")
df_reg_inconforme = df_reg_inconforme.withColumnRenamed("activo_informacion_id", "activo_informacion_id")


df_reg_inconforme.show()
""""


+---+---------------+---------+------+---------+--------------+--------------------+---------------+---------------------+-----------------+------+-----------------+--------------------+--------------------+
| id|regla_filtro_id|    llave|nombre|dimension|analisis_qa_id|      fecha_analisis|campo_activo_id|activo_informacion_id|fecha_remediacion|estado|plan_de_mejora_id|          created_at|          updated_at|
+---+---------------+---------+------+---------+--------------+--------------------+---------------+---------------------+-----------------+------+-----------------+--------------------+--------------------+
|  1|             46|  1619328|  NULL|  Validez|             1|2024-05-16 21:53:...|            207|                   12|             NULL|     2|             NULL|2024-05-16 21:53:...|2024-05-16 21:53:...|
|  2|             46|900680874|     1|  Validez|             1|2024-05-16 21:53:...|            207|                   12|             NULL|     2|             NULL|202

In [None]:
""""
from pyspark.sql.functions import col

# Convertir las columnas a los tipos de datos correspondientes en el segundo esquema
df_reg_inconforme = df_reg_inconforme \
    .withColumn("id", col("id").cast("long")) \
    .withColumn("regla_filtro_id", col("regla_filtro_id").cast("long")) \
    .withColumn("llave", col("llave").cast("string")) \
    .withColumn("analisis_qa_id", col("analisis_qa_id").cast("long")) \
    .withColumn("campo_activo_id", col("campo_activo_id").cast("long")) \
    .withColumn("activo_informacion_id", col("activo_informacion_id").cast("long")) \
    .withColumn("estado", col("estado").cast("integer")) \
    .withColumn("created_at", col("created_at").cast("timestamp")) \
    .withColumn("updated_at", col("updated_at").cast("timestamp"))

# Asegurarse de que las columnas tienen el mismo orden
df_reg_inconforme = df_reg_inconforme.select(
    "id", 
    "regla_filtro_id", 
    "llave", 
    "nombre", 
    "dimension", 
    "analisis_qa_id", 
    "fecha_analisis", 
    "campo_activo_id", 
    "activo_informacion_id", 
    "fecha_remediacion", 
    "estado", 
    "plan_de_mejora_id", 
    "created_at", 
    "updated_at"
)


# Verificar el esquema
df_reg_inconforme.printSchema()
""""

root
 |-- id: long (nullable = false)
 |-- regla_filtro_id: long (nullable = false)
 |-- llave: string (nullable = true)
 |-- nombre: string (nullable = true)
 |-- dimension: string (nullable = false)
 |-- analisis_qa_id: long (nullable = false)
 |-- fecha_analisis: timestamp (nullable = false)
 |-- campo_activo_id: long (nullable = false)
 |-- activo_informacion_id: long (nullable = false)
 |-- fecha_remediacion: timestamp (nullable = true)
 |-- estado: integer (nullable = false)
 |-- plan_de_mejora_id: long (nullable = true)
 |-- created_at: timestamp (nullable = false)
 |-- updated_at: timestamp (nullable = false)



Carga de registros no conformes

In [None]:
#cargar_dataframe_a_tabla(df_reg_inconforme,"dbo.[registro_no_conformes]")


Py4JJavaError: An error occurred while calling o377.jdbc.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 66.0 failed 1 times, most recent failure: Lost task 0.0 in stage 66.0 (TID 52) (31f2cad8f1cd executor driver): java.sql.BatchUpdateException: Violation of PRIMARY KEY constraint 'PK_registro_no_conformes_id'. Cannot insert duplicate key in object 'dbo.registro_no_conformes'. The duplicate key value is (1).
	at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.executeBatch(SQLServerPreparedStatement.java:2231)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:746)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1(JdbcUtils.scala:902)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1$adapted(JdbcUtils.scala:901)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1036)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1036)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2438)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2463)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$1(RDD.scala:1036)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
	at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:1034)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:901)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:70)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:248)
	at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:756)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.sql.BatchUpdateException: Violation of PRIMARY KEY constraint 'PK_registro_no_conformes_id'. Cannot insert duplicate key in object 'dbo.registro_no_conformes'. The duplicate key value is (1).
	at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.executeBatch(SQLServerPreparedStatement.java:2231)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:746)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1(JdbcUtils.scala:902)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1$adapted(JdbcUtils.scala:901)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1036)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1036)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2438)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more


## Generación de inconformidades Fase 2

In [None]:
resultado_concatenado.show()

+--------+---------+----------+--------------------+---------+-------+--------+--------+--------------+---------------------+
|Registro|      NIT|FechaCorte|               Regla|Dimension|  Campo|id_regla|id_campo|id_regla_campo|activo_informacion_id|
+--------+---------+----------+--------------------+---------+-------+--------+--------+--------------+---------------------+
|    NULL|  1619328|31/12/2023|TEL_REP_Regla_No_...|  Validez|TEL_REP|      46|     207|            56|                   12|
|       1|900680874|29/02/2024|TEL_REP_Regla_No_...|  Validez|TEL_REP|      46|     207|            56|                   12|
|    NULL|193198654|31/12/2023|TEL_REP_Regla_No_...|  Validez|TEL_REP|      46|     207|            56|                   12|
|    NULL|900229681|31/01/2024|TEL_REP_Regla_No_...|  Validez|TEL_REP|      46|     207|            56|                   12|
|    NULL|860044007|29/02/2024|TEL_REP_Regla_No_...|  Validez|TEL_REP|      46|     207|            56|               

In [None]:
# Agregar la columna "ID_analisisQA" con valor constante "1" al DataFrame "resultado_concatenado"
reg_inconforme_tablero = resultado_concatenado.withColumn("ID_analisisQA", lit(1))

# Crear una ventana de partición para ordenar los registros y generar un ID único
windowSpec = Window.orderBy(lit(1))
reg_inconforme_tablero = reg_inconforme_tablero.withColumn("ID_Registro", row_number().over(windowSpec))
reg_inconforme_tablero = reg_inconforme_tablero.withColumn("NIT", col("NIT").cast("string"))

reg_inconforme_tablero= reg_inconforme_tablero.withColumnRenamed("id_regla", "id_regla_filtro")
reg_inconforme_tablero= reg_inconforme_tablero.withColumnRenamed("id_regla_campo", "id_regla")
reg_inconforme_tablero= reg_inconforme_tablero.withColumnRenamed("id_campo", "id_campoActivo")
reg_inconforme_tablero= reg_inconforme_tablero.withColumnRenamed("activo_informacion_id", "id_activo")

In [None]:
reg_inconforme_tablero=reg_inconforme_tablero.select('Campo','Regla','ID_Registro','NIT','Registro','Dimension','ID_analisisQA','FechaCorte','id_regla','id_campoActivo','id_activo')
reg_inconforme_tablero.printSchema()

root
 |-- Campo: string (nullable = false)
 |-- Regla: string (nullable = false)
 |-- ID_Registro: integer (nullable = false)
 |-- NIT: string (nullable = true)
 |-- Registro: string (nullable = true)
 |-- Dimension: string (nullable = false)
 |-- ID_analisisQA: integer (nullable = false)
 |-- FechaCorte: string (nullable = true)
 |-- id_regla: integer (nullable = false)
 |-- id_campoActivo: integer (nullable = false)
 |-- id_activo: integer (nullable = false)



In [None]:
cargar_dataframe_a_tabla(reg_inconforme_tablero, "Calidad.RegCortTablero")


Generacion etiquetado

In [None]:
df_final.show(truncate=False)
df_final.printSchema()

+--------------+--------+--------+--------------------+--------------------------------------------------+-----------+----------------------------------------------------------------------+----+-------------+------------+-----------------+---------------------+
|id_regla_campo|id_campo|id_regla|nombre_regla        |descripcion_regla                                 |dimension  |descripcion_tecnica                                                   |tipo|tipo_regla_id|nombre_campo|descripcion_campo|activo_informacion_id|
+--------------+--------+--------+--------------------+--------------------------------------------------+-----------+----------------------------------------------------------------------+----+-------------+------------+-----------------+---------------------+
|57            |208     |47      |No puede estar vacio|El campo no puede presentar nulos o vacios        |Completitud|col(campo).isNull() |   (col(campo) == '')                            |0   |NULL         |REGION

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# Crear la nueva columna con el patrón especificado
df_reg_tablero = df_final.withColumn(
    "Regla",
    F.concat(
        F.col("nombre_campo"),
        F.lit("_Regla_"),
        F.regexp_replace(F.col("nombre_regla"), " ", "_")
    )
)

# Renombrar las columnas
df_reg_tablero = df_reg_tablero.withColumnRenamed("id_regla", "id_regla_filtro")
df_reg_tablero = df_reg_tablero.withColumnRenamed("id_regla_campo", "id_regla")
df_reg_tablero = df_reg_tablero.withColumnRenamed("nombre_campo", "nombre Campo")
df_reg_tablero = df_reg_tablero.withColumnRenamed("activo_informacion_id", "id_activo")
df_reg_tablero = df_reg_tablero.withColumnRenamed("descripcion_regla", "DescripcionRegla")
df_reg_tablero = df_reg_tablero.withColumnRenamed("descripcion_tecnica", "regla Tecnica")

# Seleccionar las columnas en el orden deseado
df_reg_tablero = df_reg_tablero.select('id_regla', 'Regla', 'dimension', 'nombre Campo', 'id_activo', 'DescripcionRegla', 'regla Tecnica')

# Mostrar el DataFrame resultante
df_reg_tablero.show(truncate=False)
df_reg_tablero.printSchema()


+--------+-----------------------------------+-----------+------------+---------+--------------------------------------------------+----------------------------------------------------------------------+
|id_regla|Regla                              |dimension  |nombre Campo|id_activo|DescripcionRegla                                  |regla Tecnica                                                         |
+--------+-----------------------------------+-----------+------------+---------+--------------------------------------------------+----------------------------------------------------------------------+
|57      |REGIONAL_Regla_No_puede_estar_vacio|Completitud|REGIONAL    |12       |El campo no puede presentar nulos o vacios        |col(campo).isNull() |   (col(campo) == '')                            |
|56      |TEL_REP_Regla_No_puede_ser_uno     |Validez    |TEL_REP     |12       |Esta regla valida que el campo no sea uno o vacio |col(campo).isNull() |   (col(campo) == '') |    (col

In [None]:
cargar_dataframe_a_tabla(df_reg_tablero,"Calidad.ReglaTablero")