# Q&C_4LL - Quality Rules Library

<br>__Autor:__ Marcelo Gabriel Gonzalez
<br>__Correo:__ marcelogabriel.gonzalez@bbva.com
<br>__Equipo:__ CoE Data Argentina

<a href="https://docs.google.com/spreadsheets/d/1TmidsE1NeSJrG7rMhztIVX5LO1JT9JSAVM_R7ygbJrA/edit#gid=1509800287">Documentacion</a> 

## __Librerías__

In [1]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from dataproc_sdk.dataproc_sdk_datiopysparksession.datiopysparksession import DatioPysparkSession
dataproc= DatioPysparkSession().get_or_create()
import pandas as pd
from matplotlib import pyplot as plt
import matplotlib as mpl
from pyspark.sql import SparkSession



## __Funciones__

### Finish Report

In [2]:
def finish_report(reporte: pd.DataFrame):
    if 'ID' not in reporte[0]:
        reporte_df = (spark
                      .createDataFrame(reporte)
                      .withColumn('ID', F.monotonically_increasing_id())
                      .withColumn('ID_order', F.row_number().over(Window.orderBy('ID')))
                      .withColumn('OKs', F.col('Poblacion')-F.col('Errores'))
                      .select('ID_order', 'Campo','Campo_secundario','Object', 'Tipo_Regla', 'Poblacion', 'OKs', 'Errores', 'Cumplimiento'))
        return reporte_df

### Plot Report

In [3]:
def plot_report(reporte_df: pd.DataFrame):
    
    fig, ax = plt.subplots(figsize=(15,8), facecolor=(.94, .94, .94))

    reglas = reporte_df.select('ID_order').rdd.flatMap(lambda x:x).collect()
    cumplimiento = reporte_df.select('Cumplimiento').rdd.flatMap(lambda x:x).collect()

    cols = ['red' if x < 90 else 'green' if x > 90 else 'yellow' for x in cumplimiento]
    bars = ax.barh(reglas,cumplimiento, color = cols, align='center')
    plt.axvline(x = 90, color = 'red', ls='--')
    plt.axvline(x = 95, color = 'yellow', ls='--')
    
    ax.set_facecolor('#eafff5')
    #etiqueta de barras
    ax.bar_label(bars, fmt= '{:,.2f}%', label_type ='center', color = 'white')
    #Formato de eje y
    ax.xaxis.set_major_formatter(mpl.ticker.StrMethodFormatter('{x:,.0f}%'))
    #Etiquetas de ejes
    ax.set(ylabel='ID Regla', xlabel = 'Cumplimiento')
    #Titulo
    title = plt.title('Quality Rules Results',fontsize=18,pad=20)
    title.set_position([.12, 1])
    plt.show()

### Compliance

In [4]:
def compliance(df: pd.DataFrame , by = 'Total'): 
    
    print(' Cantidad de Reglas: ', df.count(),'\n',
          'Cantidad de objetos: ', df.select('Object').distinct().count(),'\n',
          'Cantidad de campos: ', df.select('Campo').distinct().count(),'\n',
          'Reglas por campo: ',round(df.count() / df.select('Campo').distinct().count(),2),'\n',
          'Reglas por objeto: ',round(df.count() / df.select('Object').distinct().count()),2,'\n')
      
    if by == 'Total':
        return print('% de Cumplimiento Total: ', df.select(F.round(F.avg('Cumplimiento'),2)).collect()[0][0])
    
    elif by == 'Objects':
        obj_cump = df.select('Object','Cumplimiento').groupBy('Object').agg(F.round(F.avg('Cumplimiento'),2).alias('mean'))
        objects = obj_cump.select('Object').rdd.flatMap(lambda x:x).collect()
        cump = obj_cump.select('mean').rdd.flatMap(lambda x:x).collect()
        for i,j in zip(objects, cump):
            print('% de Cumplimiento -', i,':', j)
            
    elif by == 'Fields':
        fields_cump = df.select('Campo','Cumplimiento').groupBy('Campo').agg(F.round(F.avg('Cumplimiento'),2).alias('mean'))
        fields = fields_cump.select('Campo').rdd.flatMap(lambda x:x).collect()
        cump = fields_cump.select('mean').rdd.flatMap(lambda x:x).collect()
        for i,j in zip(fields, cump):
            print('% de Cumplimiento -', i,':', j)

### Export Report

In [5]:
def export_report(reporte_df: pd.DataFrame):
    reporte_df.toPandas().to_csv(str(date.today())+'-quality_results'+'.csv', index = False)

## Quality Rules

### 3.Validez

#### __3.1__ 

__V_3_1__ = Comprobación del valor de dato nulo (valor NULL o vacío)

In [6]:
def V_3_1(df: pd.DataFrame ,field: str, save_results = 'n', reporte =[], show_results = 'n', rule_type = '3.1') -> dict:
    
    Pob = df.count()
    Errores_df = df.select(field).filter(F.col(field).isNull())
    Errores = Errores_df.count()
    Cumplimiento = ((Pob - Errores)/Pob)*100
    Resultados = {'Tipo_Regla': rule_type, 'Campo' : field,'Campo_secundario' : '-', 'Object':[x for x in globals() if globals()[x] is df][0],'Poblacion': Pob, 'Errores': Errores, 'Cumplimiento': Cumplimiento}
    
    if (save_results == 'y') & (show_results == 'y'):
        reporte.append( Resultados )
        return reporte
    
    elif (save_results == 'y') & (show_results == 'n'):
        reporte.append( Resultados )
    
    elif (save_results == 'n') & (show_results == 'y'):        
        return Resultados

#### __3.2__

__(3.2) - V_3_2__ = Comprobación del formato del campo (Tipo de dato y Formato)

In [7]:
def V_3_2(df: pd.DataFrame , field: str, dtype_expected: str, save_results = 'n', reporte = [], show_results = 'n', rule_type = '3.2') -> dict:
    
    Pob = 1
    real_dtype = df.select(field).dtypes[0][1]
    
    if real_dtype == dtype_expected:
        Errores = 0
    else:
        Errores = 1
    Cumplimiento = ((Pob - Errores)/Pob)*100
    
    Resultados = {'Tipo_Regla': rule_type, 'Campo' : field,'Campo_secundario' : '-', 'Object':[x for x in globals() if globals()[x] is df][0],'Poblacion': Pob, 'Errores': Errores, 'Cumplimiento': Cumplimiento}
    
    if (save_results == 'y') & (show_results == 'y'):
        reporte.append(Resultados)
        return reporte
    
    elif (save_results == 'y') & (show_results == 'n'):
        reporte.append(Resultados)

    elif (save_results == 'n') & (show_results == 'y'):
        return Resultados

#### __3.3__

__(3.3) - V_3_3__ = Chequea valores no permitidos.

In [8]:
def V_3_3_D(df: pd.DataFrame, field: str, values: list, save_results = 'n', reporte = [], show_results = 'n', rule_type = '3.3_D') -> dict:
    
    Pob = df.count()
    Errores_df =  df.select(field).filter(F.col(field).isin(values))
    Errores = Errores_df.count()
    Cumplimiento = round(((Pob - Errores)/Pob)*100,2)
    Resultados = {'Tipo_Regla': rule_type, 'Campo' : field,'Campo_secundario' : '-', 'Object':[x for x in globals() if globals()[x] is df][0],'Poblacion': Pob, 'Errores': Errores, 'Cumplimiento': Cumplimiento}
    
    if (save_results == 'y') & (show_results == 'y'):
        reporte.append(Resultados)
        return Resultados
    
    elif (save_results == 'y') & (show_results == 'n'):
        reporte.append(Resultados)
        
    elif (save_results == 'n') & (show_results == 'y'):
        return Resultados

In [9]:
def V_3_3_C(df: pd.DataFrame, field: str, operator: str, threshold: float, save_results = 'n', reporte = [], show_results = 'n', rule_type = '3.3_C') -> dict:
    
    Pob = df.count()
    
    if operator == '<=':
        Errores = df.select(field).filter(F.col(field) <= threshold).count()
    elif operator == '<':
        Errores = df.select(field).filter(F.col(field) < threshold).count()
    elif operator == '>':
        Errores = df.select(field).filter(F.col(field) > threshold).count()
    elif operator == '>=':
        Errores = df.select(field).filter(F.col(field) >= threshold).count()
    elif operator == '==':
        Errores = df.select(field).filter(F.col(field) == threshold).count()
    elif operator == '!=':
        Errores = df.select(field).filter(F.col(field) != threshold).count()
    
    Cumplimiento = round(((Pob - Errores)/Pob)*100,2)
    Resultados = {'Tipo_Regla': rule_type, 'Campo' : field,'Campo_secundario' : '-', 'Object':[x for x in globals() if globals()[x] is df][0],'Poblacion': Pob, 'Errores': Errores, 'Cumplimiento': Cumplimiento}
    
    if (save_results == 'y') & (show_results == 'y'):
        reporte.append(Resultados)
        return reporte
    
    elif (save_results == 'y') & (show_results == 'n'):
        reporte.append(Resultados)
        
    elif (save_results == 'n') & (show_results == 'y'):
        return Resultados

#### __3.4__

__(3.4) - V_3_4__ = (Sólo para numéricas) Comprobación del valor del dato dentro del rango esperado (valor entre un rango).

In [10]:
def V_3_4(df: pd.DataFrame, field: str, min_value: float, max_value: float, save_results = 'n', reporte = [], show_results = 'n', rule_type = '3.4') -> dict:
    
    Pob = df.count()
    Errores = df.select(field).filter( (F.col(field) < min_value) | (F.col(field) > max_value) ).count()
    Cumplimiento = round(((Pob - Errores)/Pob)*100,2)
    Resultados = {'Tipo_Regla': rule_type, 'Campo' : field,'Campo_secundario' : '-', 'Object':[x for x in globals() if globals()[x] is df][0],'Poblacion': Pob, 'Errores': Errores, 'Cumplimiento': Cumplimiento}
    
    if (save_results == 'y') & (show_results == 'y'):
        reporte.append(Resultados)
        return reporte
    elif (save_results == 'y') & (show_results == 'n'):
        reporte.append(Resultados)
    elif (save_results == 'n') & (show_results == 'y'):
        return Resultados

#### __3.5__

__(3.5) - V_3_5_E__ = Esta función chequea valores dentro de un catálogo estático (E). El catalogo estatico solo tiene 2 valores posibles que se deben pasar como lista.

In [11]:
def V_3_5_E(df: pd.DataFrame, field: str, values: list, save_results = 'n', reporte = [], show_results = 'n', rule_type = '3.5_E') -> dict:
    
    Pob = df.count() 
    Errores = df.select(field).filter(~F.col(field).isin(values)).count()
    Cumplimiento = round(((Pob - Errores)/Pob)*100,2)
    Resultados = {'Tipo_Regla': rule_type, 'Campo' : field,'Campo_secundario' : '-', 'Object':[x for x in globals() if globals()[x] is df][0],'Poblacion': Pob, 'Errores': Errores, 'Cumplimiento': Cumplimiento}
    
    if (save_results == 'y') & (show_results == 'y'):
        reporte.append(Resultados)
        return reporte
    
    elif (save_results == 'y') & (show_results == 'n'):
        reporte.append(Resultados)
        
    elif (save_results == 'n') & (show_results == 'y'):
        return Resultados

__(3.5) - V_3_5_D__ = Esta función chequea valores dentro de un catálogo dinámico (D) (Muchos valores)

In [12]:
def V_3_5_D(df: pd.DataFrame, field: str, catalog_path, catalog_cd, save_results = 'n', reporte = [], show_results = 'n', rule_type = '3.5_D') -> dict:
    
    Pob = df.count()
    catalog_values = (dataproc.read().parquet(catalog_path).filter(F.col('g_catalog_id') == catalog_cd)
                      .select('gf_catalog_val_id')
                      .distinct()
                      .rdd.flatMap(lambda x:x)
                      .collect())
    Errores = df.select(field).filter(~F.col(field).isin(catalog_values)).count()
    Cumplimiento = round(((Pob - Errores)/Pob)*100,2)
    Resultados = {'Tipo_Regla': rule_type, 'Campo' : field,'Campo_secundario' : 'gf_catalog_val_id', 'Object':[x for x in globals() if globals()[x] is df][0],'Poblacion': Pob, 'Errores': Errores, 'Cumplimiento': Cumplimiento}
    
    if (save_results == 'y') & (show_results == 'y'):
        reporte.append(Resultados)
        return reporte
    
    elif (save_results == 'y') & (show_results == 'n'):
        reporte.append(Resultados)
        
    elif (save_results == 'n') & (show_results == 'y'):
        return Resultados

#### __3.6__

__(3.6) - V_3_6__ = Controla que no haya whitespaces.

In [13]:
def V_3_6(df: pd.DataFrame ,field: str, save_results = 'n', reporte = [], show_results = 'n', rule_type = '3.6') -> dict:
    
    Pob = df.count()
    Errores = df.select(field).withColumn('blanks', F.length(F.regexp_replace(field, '[^ ]+', ''))).filter(F.col('blanks')!= 0).count()
    Cumplimiento = ((Pob - Errores)/Pob)*100
    Resultados = {'Tipo_Regla': rule_type, 'Campo' : field,'Campo_secundario' : '-', 'Object':[x for x in globals() if globals()[x] is df][0],'Poblacion': Pob, 'Errores': Errores, 'Cumplimiento': Cumplimiento}
    
    if (save_results == 'y') & (show_results == 'y'):
        reporte.append(Resultados)
        return reporte
    
    elif (save_results == 'y') & (show_results == 'n'):
        reporte.append(Resultados)
        
    elif (save_results == 'n') & (show_results == 'y'):
        return Resultados

### 4.Consistencia

#### __4.2__

__C_4_2__ = Duplicidad de registros para un mismo dato (Duplicados, triplicados, etc)

In [14]:
def C_4_2(df: pd.DataFrame, field: str, save_results = 'n', reporte = [], show_results = 'n', rule_type = '4.2') -> dict:
    
    Pob = df.count()
    Errores = df.select(field).groupBy(field).count().withColumn('errores',F.col('count')-1).select(F.sum('errores')).collect()[0][0]
    Cumplimiento = round(((Pob - Errores)/Pob)*100,2)
    Resultados = {'Tipo_Regla': rule_type, 'Campo' : field,'Campo_secundario' : '-', 'Object':[x for x in globals() if globals()[x] is df][0],'Poblacion': Pob, 'Errores': Errores, 'Cumplimiento': Cumplimiento}
    
    if (save_results == 'y') & (show_results == 'y'):
        reporte.append(Resultados)
        return reporte
    
    elif (save_results == 'y') & (show_results == 'n'):
        reporte.append(Resultados)
        
    elif (save_results == 'n') & (show_results == 'y'):
        return Resultados

#### __4.3__

__C_4_3__ = Conciliación de un mismo dato entre diferentes tablas/repositorios en el mismo momento de tiempo (Conciliación contable/Integridad referencial)

In [15]:
def C_4_3(df1: pd.DataFrame, key1: str, field1: str , df2: pd.DataFrame, key2: str, field2: str, save_results = 'n', reporte = [], show_results = 'n', rule_type = '4.3') -> dict:
    
    df1_values = df1.select(key1, field1).withColumnRenamed(key1, key1+'_1').withColumnRenamed(field1, field1+'_1')
    df2_values = df2.select(key2, field2).withColumnRenamed(key2, key2+'_2').withColumnRenamed(field2, field2+'_2')
    df = df1_values.join(df2_values, how = 'inner', on = [df1_values[0] == df2_values[0]] )

    Pob = df.count()
    Errores = (df.filter( F.col(field1+'_1') != F.col(field1+'_2')).count())
    Cumplimiento = round(((Pob - Errores)/Pob)*100,2)
    Resultados = {'Tipo_Regla': rule_type, 'Campo' : field1,'Campo_secundario' : field2, 'Object':[x for x in globals() if globals()[x] is df1][0] + ' & ' + [x for x in globals() if globals()[x] is df2][0],'Poblacion': Pob, 'Errores': Errores, 'Cumplimiento': Cumplimiento}

    if (save_results == 'y') & (show_results == 'y'):
        reporte.append(Resultados)
        return reporte
    
    elif (save_results == 'y') & (show_results == 'n'):
        reporte.append(Resultados)
        
    elif (save_results == 'n') & (show_results == 'y'):
        return Resultados

### 5.Integridad

#### __5.2__

__5.2__ = Comparativa entre diferentes valores de datos relacionados entre sí (datos en objetos distintos): Comparación lógica entre dos campos o expresiones entre diferentes objetos.

In [16]:
def I_5_2(df1: pd.DataFrame, key1: str, field1: str, df2: pd.DataFrame, key2: str, field2: str, operator: str, save_results = 'n', reporte = [], show_results = 'n', rule_type = '5.2') -> dict:
    
    df1_values = df1.select(key1, field1).withColumnRenamed(key1, key1+'_1')
    df2_values = df2.select(key2, field2).withColumnRenamed(key2, key2+'_2')
    df = df1_values.join(df2_values, how = 'inner', on = [df1_values[0] == df2_values[0]] )
    
    Pob = df.count()
        
    if operator == '<=':
        Errores = df.select(field1,field2).filter(F.col(field1) <= F.col(field2)).count()
    elif operator == '<':
        Errores = df.select(field1,field2).filter(F.col(field1) < F.col(field2)).count()
    elif operator == '>':
        Errores = df.select(field1,field2).filter(F.col(field1) > F.col(field2)).count()
    elif operator == '>=':
        Errores = df.select(field1,field2).filter(F.col(field1) >= F.col(field2)).count()
    
    Cumplimiento = round(((Pob - Errores)/Pob)*100,2)
    Resultados = {'Tipo_Regla': rule_type, 'Campo' : field1,'Campo_secundario' : field2, 'Object':[x for x in globals() if globals()[x] is df1][0] + ' & ' + [x for x in globals() if globals()[x] is df2][0],'Poblacion': Pob, 'Errores': Errores, 'Cumplimiento': Cumplimiento}
    
    if (save_results == 'y') & (show_results == 'y'):
        reporte.append(Resultados)
        return reporte
    
    elif (save_results == 'y') & (show_results == 'n'):
        reporte.append(Resultados)
        
    elif (save_results == 'n') & (show_results == 'y'):
        return Resultados

#### __5.3__

__(5.3) - I_5_3__ = Comparativa entre diferentes valores de datos relacionados entre sí (datos en mismo objeto): Comparación lógica entre dos campos o expresiones en el mismo objeto.

In [17]:
def I_5_3(df: pd.DataFrame, field1: str, operator: str, field2: str,save_results = 'n', reporte = [], show_results = 'n', rule_type = '5.3') -> dict:
    
    Pob = df.count()
    
    if operator == '<=':
        Errores = df.select(field1,field2).filter(F.col(field1) <= F.col(field2)).count()
    elif operator == '<':
        Errores = df.select(field1,field2).filter(F.col(field1) < F.col(field2)).count()
    elif operator == '>':
        Errores = df.select(field1,field2).filter(F.col(field1) > F.col(field2)).count()
    elif operator == '>=':
        Errores = df.select(field1,field2).filter(F.col(field1) >= F.col(field2)).count()
    
    Cumplimiento = round(((Pob - Errores)/Pob)*100,2)
    Resultados = {'Tipo_Regla': rule_type, 'Campo' : field1,'Campo_secundario' : field2, 'Object':[x for x in globals() if globals()[x] is df][0],'Poblacion': Pob, 'Errores': Errores, 'Cumplimiento': Cumplimiento}
    
    if (save_results == 'y') & (show_results == 'y'):
        reporte.append(Resultados)
        return reporte
    
    elif (save_results == 'y') & (show_results == 'n'):
        reporte.append(Resultados)
        
    elif (save_results == 'n') & (show_results == 'y'):
        return Resultados

### 6.Precision

#### __6.1__

__6.1__ = Concentración valores contra catálogo.

In [18]:
def P_6_1(df: pd.DataFrame, field: str, operator: str, threshold: float, catalog_df: pd.DataFrame, catalog_cd, save_results = 'n', reporte = [], show_results = 'n', rule_type = '6.1') -> dict:
    
    catalog_values = (catalog_df.filter(F.col('g_catalog_id') == catalog_cd)
                      .select('gf_catalog_val_id')
                      .distinct()
                      .rdd.flatMap(lambda x:x)
                      .collect() )
    
    df_tmp = (df.select(field)
          .filter(F.col(field).isin(catalog_values))
          .groupBy(field).agg(F.count('*').alias('Q')))
    tmp_val = df_tmp.select(F.sum('Q')).collect()[0][0]
    df_tmp1 = (df_tmp.withColumn('Total', F.lit(tmp_val))
          .withColumn('Perc', F.round( F.col('Q') / F.col('Total') *100,3))
          .withColumn('Umbral', F.lit(threshold)))
    
    Pob = df_tmp1.count()
    
    if operator == '<=':
        Errores = df_tmp1.filter(F.col('Perc') <= F.col('Umbral')).count()
    elif operator == '<':
        Errores = df_tmp1.filter(F.col('Perc') < F.col('Umbral')).count()
    elif operator == '>':
        Errores = df_tmp1.filter(F.col('Perc') > F.col('Umbral')).count()
    elif operator == '>=':
        Errores = df_tmp1.filter(F.col('Perc') >= F.col('Umbral')).count()
    
    Cumplimiento = round(((Pob - Errores)/Pob)*100,2)
    Resultados = {'Tipo_Regla': rule_type, 'Campo' : field,'Campo_secundario' : 'gf_catalog_val_id' , 'Object':[x for x in globals() if globals()[x] is df][0],'Poblacion': Pob, 'Errores': Errores, 'Cumplimiento': Cumplimiento}

    if (save_results == 'y') & (show_results == 'y'):
        reporte.append(Resultados)
        return reporte
    
    elif (save_results == 'y') & (show_results == 'n'):
        reporte.append(Resultados)
        
    elif (save_results == 'n') & (show_results == 'y'):
        return Resultados

#### __6.2__

__P_6_2__ = Comprobación de la sensibilidad del valor de un dato a un umbral (Valor contra umbral) Para agregaciones.

In [19]:
def P_6_2(df: pd.DataFrame, field1: str, agg_type, operator: str, threshold: float, save_results = 'n', reporte = [], show_results = 'n', rule_type = '6.2') -> dict:
    """P_6_2 = Compara contra un umbral """
    
    df_grp = df.select(field1).groupBy(field1).agg(agg_type(field1))
    
    Pob = df_grp.count()
    
    if operator == '<=':
        Errores = df_grp.filter(F.col(df_grp.columns[1]) <= threshold).count()
    elif operator == '<':
        Errores = df_grp.filter(F.col(df_grp.columns[1]) < threshold).count()
    elif operator == '>':
        Errores = df_grp.filter(F.col(df_grp.columns[1]) > threshold).count()
    elif operator == '>=':
        Errores = df_grp.filter(F.col(df_grp.columns[1]) >= threshold).count()
    
    Cumplimiento = round(((Pob - Errores)/Pob)*100,2)
    Resultados = {'Tipo_Regla': rule_type, 'Campo' : field1,'Campo_secundario' : '-' , 'Object':[x for x in globals() if globals()[x] is df][0],'Poblacion': Pob, 'Errores': Errores, 'Cumplimiento': Cumplimiento}
    
    if (save_results == 'y') & (show_results == 'y'):
        reporte.append(Resultados)
        return reporte
    elif (save_results == 'y') & (show_results == 'n'):
        reporte.append(Resultados)
    elif (save_results == 'n') & (show_results == 'y'):
        return Resultados

#### __6.4__

__P_6_4__ = Comprobación de la sensibilidad del valor de un dato a un umbral (Valor contra umbral) Sin agregaciones.

In [20]:
def P_6_4(df: pd.DataFrame, date_field: str, categorical_field: str, amount_field:str, operator:str, threshold: float, save_results = 'n', reporte = [], show_results = 'n', rule_type = '6.4') -> dict:
    """P_6_4 = Compara contra un umbral """

    windowSpec = Window.partitionBy(categorical_field).orderBy(date_field)
    
    if operator == '>=':    
        df_grp = (df
         .select(date_field,categorical_field,amount_field)
         .withColumn('previus_value',F.lag(amount_field,1).over(windowSpec))
         .filter(~F.col('previus_value').isNull())
         .withColumn('Var', F.abs((F.col(amount_field) - F.col('previus_value')) / F.col(amount_field) *100))
         .withColumn('parametro', F.lit(threshold))
         .withColumn('status', F.when(F.col('Var') >= F.col('parametro'), 1).otherwise(0)))
        
    elif operator == '<=':
        df_grp = (df
                  .select(date_field,categorical_field,amount_field)
                  .withColumn('previus_value',F.lag(amount_field,1).over(windowSpec))
                  .filter(~F.col('previus_value').isNull())
                  .withColumn('Var', F.abs((F.col(amount_field) - F.col('previus_value')) / F.col(amount_field) *100))
                  .withColumn('parametro', F.lit(threshold))
                  .withColumn('status', F.when(F.col('Var') <= F.col('parametro'), 1).otherwise(0)))
    
    Pob = df_grp.count()
    Errores = df_grp.filter(F.col('status') == 1).count()
    Cumplimiento = round(((Pob - Errores)/Pob)*100,2)
    Resultados = {'Tipo_Regla': rule_type, 'Campo' : amount_field,'Campo_secundario' : categorical_field , 'Object':[x for x in globals() if globals()[x] is df][0],'Poblacion': Pob, 'Errores': Errores, 'Cumplimiento': Cumplimiento}
    
    if (save_results == 'y') & (show_results == 'y'):
        reporte.append(Resultados)
        return reporte
    
    elif (save_results == 'y') & (show_results == 'n'):
        reporte.append(Resultados)
        
    elif (save_results == 'n') & (show_results == 'y'):
        return Resultados

__P_6_4_GRP__ = Comprobación de la sensibilidad del valor de un dato a un umbral (Valor contra umbral) Para agregaciones.

In [21]:
def P_6_4_GRP(df: pd.DataFrame, date_field: str, categorical_field: str, amount_field:str, agg_type: str, operator:str, threshold: float, save_results = 'n', reporte = [], show_results = 'n', rule_type = '6.4_GRP') -> dict:
    """P_6_4 = Compara contra un umbral """

    windowSpec = Window.partitionBy(categorical_field).orderBy(date_field)
    
    if operator == '>=':    
        df_grp = (df
         .select(date_field,categorical_field,amount_field)
         .groupBy(date_field,categorical_field).agg(agg_type(amount_field).alias(amount_field+'_grp'))
         .withColumn('previus_value',F.lag(amount_field+'_grp',1).over(windowSpec))
         .filter(~F.col('previus_value').isNull())
         .withColumn('Var', F.abs((F.col(amount_field+'_grp') - F.col('previus_value')) / F.col(amount_field+'_grp') *100))
         .withColumn('parametro', F.lit(threshold))
         .withColumn('status', F.when(F.col('Var') >= F.col('parametro'), 1).otherwise(0)))
        
    elif operator == '<=':
        df_grp = (df
                  .select(date_field,categorical_field,amount_field)
                  .groupBy(date_field,categorical_field).agg(agg_type(amount_field).alias(amount_field+'_grp'))
                  .withColumn('previus_value',F.lag(amount_field+'_grp',1).over(windowSpec))
                  .filter(~F.col('previus_value').isNull())
                  .withColumn('Var', F.abs((F.col(amount_field+'_grp') - F.col('previus_value')) / F.col(amount_field+'_grp') *100))
                  .withColumn('parametro', F.lit(threshold))
                  .withColumn('status', F.when(F.col('Var') <= F.col('parametro'), 1).otherwise(0)))
    
    Pob = df_grp.count()
    Errores = df_grp.filter(F.col('status') == 1).count()
    Cumplimiento = round(((Pob - Errores)/Pob)*100,2)
    Resultados = {'Tipo_Regla': rule_type, 'Campo' : amount_field,'Campo_secundario' : categorical_field , 'Object':[x for x in globals() if globals()[x] is df][0],'Poblacion': Pob, 'Errores': Errores, 'Cumplimiento': Cumplimiento}
    
    if (save_results == 'y') & (show_results == 'y'):
        reporte.append(Resultados)
        return reporte
    
    elif (save_results == 'y') & (show_results == 'n'):
        reporte.append(Resultados)
        
    elif (save_results == 'n') & (show_results == 'y'):
        return Resultados