# Histogram Anomaly Detection

__O objetivo desse tutorial é apresentar Deteccao de Anomalias em Histogramas__. Essa funcionalidade permite voce acompanhar desvios na distribuicao de valores de uma coluna. 

### Installation

O primeiro passo é fazer a instalacao do pacote via *pip install*

In [46]:
#!pip install dataquality_bnr

### Set up a PySpark session
A biblioteca foi construida para ser utilizada com __PySpark__ e possibilitar '*testes unitarios dos dados*', executando validacoes qualitativas em datasets de larga escala.</br>
A integracao da sessao spark e a biblioteca depende apenas de duas configuracoes adicionais:


In [1]:
from pyspark.sql import SparkSession, Row
from dataquality_bnr.dqSupport import main as dqSup

spark = SparkSession\
        .builder\
        .config("spark.jars", dqSup.getDeequJar_path())\
        .config("spark.jars.excludes", dqSup.getDeequJar_excludes())\
        .getOrCreate()

### Dataset
We will be running the analyzers on a dataset sampled from th.thbpd38 table

In [2]:
sql_query = """
select
    i1c_renda_final,
    i1c_lim_pre_ap_preventivo,
    i1c_rating_riscos,
    i1d_idade,
    i1d_sexo,
    i1c_cli_possui_conta,
    i1c_soc_cd_segm_empr1,
    i1c_soc_cd_ramo_atvd1,
    dat_ref_carga
from th.thbpd381 where dat_ref_carga='2022-01-03'
"""
df_input = spark.sql(sql_query)

In [3]:
df_input.printSchema()

root
 |-- i1c_renda_final: integer (nullable = true)
 |-- i1c_lim_pre_ap_preventivo: integer (nullable = true)
 |-- i1c_rating_riscos: integer (nullable = true)
 |-- i1d_idade: integer (nullable = true)
 |-- i1d_sexo: string (nullable = true)
 |-- i1c_cli_possui_conta: string (nullable = true)
 |-- i1c_soc_cd_segm_empr1: integer (nullable = true)
 |-- i1c_soc_cd_ramo_atvd1: integer (nullable = true)
 |-- dat_ref_carga: string (nullable = true)



## Histogram Anomaly Detection

In [4]:
##Caso queira limpar o diretório antes de iniciar outra vez o tutorial

#!hdfs dfs -rm -r /user/x266727/dataquality_bnr-docs/histogram_anomalyDetec/DQ/

22/02/04 14:13:01 INFO fs.TrashPolicyDefault: Moved: 'hdfs://nameservice1/user/x266727/dataquality_bnr-docs/histogram_anomalyDetec/DQ' to trash at: hdfs://nameservice1/user/x266727/.Trash/Current/user/x266727/dataquality_bnr-docs/histogram_anomalyDetec/DQ1643994781384


In [6]:
from dataquality_bnr.dqRunning import main as dqRun
from dataquality_bnr.dqSupport import main as dqSup

# 1° dia - Simulando Execucao 

In [7]:
dqView_simple = {"viewName" : "dqView_check_moderate",
                 "inputData": df_input,
                 "infraYaml": "yamlFiles/histogram_anomalyDetec/an_anomalyDetec/infrastructure.yaml",
                 "vsYaml": "yamlFiles/histogram_anomalyDetec/an_anomalyDetec/verificationSuite.yaml"}

dq_bnr_directory="/user/x266727/dataquality_bnr-docs/histogram_anomalyDetec/DQ"

In [8]:
myDq = dqRun.Dq(spark, dq_bnr_directory)

myDq = (myDq
        .addView(dqView_simple, "AnomalyDetection"))
        
dq_run_return = myDq.run()

__Evaluate DataQuality View and add to Main DataQuality Process__
dqView_check_moderate pattern evaluated.


__Running all DataQuality views__
run() dqView_check_moderate:
getPersistentRepository...
applyFiltering...
buildTemporaryRepository...
getDataframe...
getAnomalyDetection...
getOuputDFs...
custom_getAnomalyDetection...
getOuputDFs...
As AnomalyDetection temporaryRepository is still empty. Forcing Success Outcome
writeTo_persistentRepository...
writeTo_currentResults...
DataQuality process finished.


__Get overall result__
writing to: /user/x266727/dataquality_bnr-docs/histogram_anomalyDetec/DQ/dqView_check_moderate/currentResult/csvResult/dqView_check_moderate_OK.csv

writing to: /user/x266727/dataquality_bnr-docs/histogram_anomalyDetec/DQ/overall/currentResult/csvResult/overall_OK.csv


# 1° dia - Analisando Resultados

In [9]:
import pandas as pd

#### DQ/overall/overall_OK.csv

In [10]:
path = dq_bnr_directory+'/overall/currentResult/csvResult/overall_OK.csv'
print(path)
df = spark.read.option("header",True).csv(path)
df.toPandas()

/user/x266727/dataquality_bnr-docs/histogram_anomalyDetec/DQ/overall/currentResult/csvResult/overall_OK.csv


Unnamed: 0,check,check_level,check_status,constraint,constraint_status,constraint_message,dataset_date,YY_MM_DD,viewName,viewPath
0,Descpription by Yaml,Error,Success,AnomalyConstraint(Mean(i1c_rating_riscos_Histo...,Failure,Can't execute the assertion: requirement faile...,1643994929878,2022-02-04,dqView_check_moderate,/user/x266727/dataquality_bnr-docs/histogram_a...
1,Descpription by Yaml,Error,Success,AnomalyConstraint(Mean(i1c_rating_riscos_Histo...,Failure,Can't execute the assertion: requirement faile...,1643994929878,2022-02-04,dqView_check_moderate,/user/x266727/dataquality_bnr-docs/histogram_a...
2,Descpription by Yaml,Error,Success,AnomalyConstraint(Mean(i1c_rating_riscos_Histo...,Failure,Can't execute the assertion: requirement faile...,1643994929878,2022-02-04,dqView_check_moderate,/user/x266727/dataquality_bnr-docs/histogram_a...
3,Descpription by Yaml,Error,Success,AnomalyConstraint(Mean(i1c_rating_riscos_Histo...,Failure,Can't execute the assertion: requirement faile...,1643994929878,2022-02-04,dqView_check_moderate,/user/x266727/dataquality_bnr-docs/histogram_a...
4,Descpription by Yaml,Error,Success,AnomalyConstraint(Mean(i1c_rating_riscos_Histo...,Failure,Can't execute the assertion: requirement faile...,1643994929878,2022-02-04,dqView_check_moderate,/user/x266727/dataquality_bnr-docs/histogram_a...
5,Descpription by Yaml,Error,Success,AnomalyConstraint(Mean(i1c_rating_riscos_Histo...,Failure,Can't execute the assertion: requirement faile...,1643994929878,2022-02-04,dqView_check_moderate,/user/x266727/dataquality_bnr-docs/histogram_a...
6,Descpription by Yaml,Error,Success,AnomalyConstraint(Mean(i1c_rating_riscos_Histo...,Failure,Can't execute the assertion: requirement faile...,1643994929878,2022-02-04,dqView_check_moderate,/user/x266727/dataquality_bnr-docs/histogram_a...
7,Descpription by Yaml,Error,Success,AnomalyConstraint(Mean(i1c_rating_riscos_Histo...,Failure,Can't execute the assertion: requirement faile...,1643994929878,2022-02-04,dqView_check_moderate,/user/x266727/dataquality_bnr-docs/histogram_a...
8,Descpription by Yaml,Error,Success,AnomalyConstraint(Mean(i1c_rating_riscos_Histo...,Failure,Can't execute the assertion: requirement faile...,1643994929878,2022-02-04,dqView_check_moderate,/user/x266727/dataquality_bnr-docs/histogram_a...
9,Descpription by Yaml,Error,Success,AnomalyConstraint(Mean(i1d_sexo_Histogram_rati...,Failure,Can't execute the assertion: requirement faile...,1643994929878,2022-02-04,dqView_check_moderate,/user/x266727/dataquality_bnr-docs/histogram_a...


In [11]:
pd.set_option('display.max_colwidth', None)

display(df.select("constraint_message").toPandas())

pd.set_option('display.max_colwidth', 50)

Unnamed: 0,constraint_message
0,Can't execute the assertion: requirement failed: There have to be previous results in the MetricsRepository!!
1,Can't execute the assertion: requirement failed: There have to be previous results in the MetricsRepository!!
2,Can't execute the assertion: requirement failed: There have to be previous results in the MetricsRepository!!
3,Can't execute the assertion: requirement failed: There have to be previous results in the MetricsRepository!!
4,Can't execute the assertion: requirement failed: There have to be previous results in the MetricsRepository!!
5,Can't execute the assertion: requirement failed: There have to be previous results in the MetricsRepository!!
6,Can't execute the assertion: requirement failed: There have to be previous results in the MetricsRepository!!
7,Can't execute the assertion: requirement failed: There have to be previous results in the MetricsRepository!!
8,Can't execute the assertion: requirement failed: There have to be previous results in the MetricsRepository!!
9,Can't execute the assertion: requirement failed: There have to be previous results in the MetricsRepository!!


### DQ/view/currentResult/checkResult.parquet

In [12]:
dqView_name = dqView_simple["viewName"]
path = dq_bnr_directory +"/"+ dqView_name +'/currentResult/checkResult.parquet'
print(path)

df = spark.read.parquet(path)
df.drop("YY_MM_DD").drop("dataset_date").show()

/user/x266727/dataquality_bnr-docs/histogram_anomalyDetec/DQ/dqView_check_moderate/currentResult/checkResult.parquet
+--------------------+-----------+------------+--------------------+-----------------+--------------------+
|               check|check_level|check_status|          constraint|constraint_status|  constraint_message|
+--------------------+-----------+------------+--------------------+-----------------+--------------------+
|Descpription by Yaml|      Error|     Success|AnomalyConstraint...|          Failure|Can't execute the...|
|Descpription by Yaml|      Error|     Success|AnomalyConstraint...|          Failure|Can't execute the...|
|Descpription by Yaml|      Error|     Success|AnomalyConstraint...|          Failure|Can't execute the...|
|Descpription by Yaml|      Error|     Success|AnomalyConstraint...|          Failure|Can't execute the...|
|Descpription by Yaml|      Error|     Success|AnomalyConstraint...|          Failure|Can't execute the...|
|Descpription by Ya

In [13]:
df.select("constraint","constraint_status").toPandas()

Unnamed: 0,constraint,constraint_status
0,AnomalyConstraint(Mean(i1c_rating_riscos_Histo...,Failure
1,AnomalyConstraint(Mean(i1c_rating_riscos_Histo...,Failure
2,AnomalyConstraint(Mean(i1c_rating_riscos_Histo...,Failure
3,AnomalyConstraint(Mean(i1c_rating_riscos_Histo...,Failure
4,AnomalyConstraint(Mean(i1c_rating_riscos_Histo...,Failure
5,AnomalyConstraint(Mean(i1c_rating_riscos_Histo...,Failure
6,AnomalyConstraint(Mean(i1c_rating_riscos_Histo...,Failure
7,AnomalyConstraint(Mean(i1c_rating_riscos_Histo...,Failure
8,AnomalyConstraint(Mean(i1c_rating_riscos_Histo...,Failure
9,AnomalyConstraint(Mean(i1d_sexo_Histogram_rati...,Failure


### DQ/view/currentResult/successMetrics.parquet

In [14]:
dqView_name = dqView_simple["viewName"]
path = dq_bnr_directory +"/"+ dqView_name +'/currentResult/successMetrics.parquet'
print(path)

df = spark.read.parquet(path)
df.show()

/user/x266727/dataquality_bnr-docs/histogram_anomalyDetec/DQ/dqView_check_moderate/currentResult/successMetrics.parquet
+-------+--------------------+----+--------------------+-------------+----------+----+------------+
| entity|            instance|name|               value| dataset_date|  YY_MM_DD|tags|check_status|
+-------+--------------------+----+--------------------+-------------+----------+----+------------+
| Column|i1c_rating_riscos...|Mean|0.013366761013028666|1643994929878|2022-02-04|  []|     Success|
| Column|i1c_rating_riscos...|Mean| 0.33740586918125587|1643994929878|2022-02-04|  []|     Success|
| Column|i1c_rating_riscos...|Mean|0.028654936186879698|1643994929878|2022-02-04|  []|     Success|
| Column|i1c_rating_riscos...|Mean| 0.03564408019687105|1643994929878|2022-02-04|  []|     Success|
| Column|i1c_rating_riscos...|Mean|5.542540848526054E-6|1643994929878|2022-02-04|  []|     Success|
| Column|i1c_rating_riscos...|Mean| 0.12202642683476578|1643994929878|2022-02-04

In [15]:
df.select("instance","value").toPandas()

Unnamed: 0,instance,value
0,i1c_rating_riscos_Histogram_ratio_4,0.013367
1,i1c_rating_riscos_Histogram_ratio_1,0.337406
2,i1c_rating_riscos_Histogram_ratio_8,0.028655
3,i1c_rating_riscos_Histogram_ratio_9,0.035644
4,i1c_rating_riscos_Histogram_ratio_5,6e-06
5,i1c_rating_riscos_Histogram_ratio_2,0.122026
6,i1c_rating_riscos_Histogram_ratio_0,0.37933
7,i1c_rating_riscos_Histogram_ratio_6,0.02377
8,i1c_rating_riscos_Histogram_ratio_7,0.059797
9,i1d_sexo_Histogram_ratio_M,0.609611


### DQ/view/persistentRepository/successMetrics.parquet

In [16]:
dqView_name = dqView_simple["viewName"]
path = dq_bnr_directory +"/"+ dqView_name +'/persistentRepository/successMetrics.parquet'
print(path)
print(path)
df = spark.read.parquet(path)
df.toPandas()

/user/x266727/dataquality_bnr-docs/histogram_anomalyDetec/DQ/dqView_check_moderate/persistentRepository/successMetrics.parquet
/user/x266727/dataquality_bnr-docs/histogram_anomalyDetec/DQ/dqView_check_moderate/persistentRepository/successMetrics.parquet


Unnamed: 0,entity,instance,name,value,dataset_date,YY_MM_DD,tags,check_status
0,Column,i1c_rating_riscos_Histogram_ratio_4,Mean,0.013367,1643994929878,2022-02-04,[],Success
1,Column,i1c_rating_riscos_Histogram_ratio_1,Mean,0.337406,1643994929878,2022-02-04,[],Success
2,Column,i1c_rating_riscos_Histogram_ratio_8,Mean,0.028655,1643994929878,2022-02-04,[],Success
3,Column,i1c_rating_riscos_Histogram_ratio_9,Mean,0.035644,1643994929878,2022-02-04,[],Success
4,Column,i1c_rating_riscos_Histogram_ratio_5,Mean,6e-06,1643994929878,2022-02-04,[],Success
5,Column,i1c_rating_riscos_Histogram_ratio_2,Mean,0.122026,1643994929878,2022-02-04,[],Success
6,Column,i1c_rating_riscos_Histogram_ratio_0,Mean,0.37933,1643994929878,2022-02-04,[],Success
7,Column,i1c_rating_riscos_Histogram_ratio_6,Mean,0.02377,1643994929878,2022-02-04,[],Success
8,Column,i1c_rating_riscos_Histogram_ratio_7,Mean,0.059797,1643994929878,2022-02-04,[],Success
9,Column,i1d_sexo_Histogram_ratio_M,Mean,0.609611,1643994929878,2022-02-04,[],Success


# 2° dia - Simulando Execucao 

In [17]:
dqView_simple = {"viewName" : "dqView_check_moderate",
                 "inputData": df_input,
                 "infraYaml": "yamlFiles/histogram_anomalyDetec/an_anomalyDetec/infrastructure.yaml",
                 "vsYaml": "yamlFiles/histogram_anomalyDetec/an_anomalyDetec/verificationSuite.yaml"}

dq_bnr_directory="/user/x266727/dataquality_bnr-docs/histogram_anomalyDetec/DQ"

In [18]:
myDq = dqRun.Dq(spark, dq_bnr_directory)

myDq = (myDq
        .addView(dqView_simple, "AnomalyDetection"))
        
dq_run_return = myDq.run()

__Evaluate DataQuality View and add to Main DataQuality Process__
dqView_check_moderate pattern evaluated.


__Running all DataQuality views__
run() dqView_check_moderate:
getPersistentRepository...
applyFiltering...
buildTemporaryRepository...
getDataframe...
getAnomalyDetection...
getOuputDFs...
custom_getAnomalyDetection...
getOuputDFs...
writeTo_persistentRepository...
writeTo_currentResults...
DataQuality process finished.


__Get overall result__
writing to: /user/x266727/dataquality_bnr-docs/histogram_anomalyDetec/DQ/dqView_check_moderate/currentResult/csvResult/dqView_check_moderate_OK.csv

writing to: /user/x266727/dataquality_bnr-docs/histogram_anomalyDetec/DQ/overall/currentResult/csvResult/overall_OK.csv


# 2° dia - Analisando Resultados

#### DQ/overall/overall_OK.csv

In [19]:
path = dq_bnr_directory+'/overall/currentResult/csvResult/overall_OK.csv'
print(path)
df = spark.read.option("header",True).csv(path)
df.toPandas()

/user/x266727/dataquality_bnr-docs/histogram_anomalyDetec/DQ/overall/currentResult/csvResult/overall_OK.csv


Unnamed: 0,check,check_level,check_status,constraint,constraint_status,constraint_message,dataset_date,YY_MM_DD,viewName,viewPath
0,,,Success,,,,,,overall,


### DQ/view/persistentRepository/successMetrics.parquet

In [20]:
dqView_name = dqView_simple["viewName"]
path = dq_bnr_directory +"/"+ dqView_name +'/persistentRepository/successMetrics.parquet'
print(path)
df = spark.read.parquet(path)
print("After 2nd execution, now persistentRepository has "+str(df.count())+" records.")

/user/x266727/dataquality_bnr-docs/histogram_anomalyDetec/DQ/dqView_check_moderate/persistentRepository/successMetrics.parquet
After 2nd execution, now persistentRepository has 26 records.


### DQ/view/currentResult/checkResult.parquet

In [21]:
dqView_name = dqView_simple["viewName"]
path = dq_bnr_directory +"/"+ dqView_name +'/currentResult/checkResult.parquet'
print(path)

df = spark.read.parquet(path)
df.drop("YY_MM_DD").drop("dataset_date").show()

/user/x266727/dataquality_bnr-docs/histogram_anomalyDetec/DQ/dqView_check_moderate/currentResult/checkResult.parquet
+--------------------+-----------+------------+--------------------+-----------------+------------------+
|               check|check_level|check_status|          constraint|constraint_status|constraint_message|
+--------------------+-----------+------------+--------------------+-----------------+------------------+
|Descpription by Yaml|      Error|     Success|AnomalyConstraint...|          Success|                  |
|Descpription by Yaml|      Error|     Success|AnomalyConstraint...|          Success|                  |
|Descpription by Yaml|      Error|     Success|AnomalyConstraint...|          Success|                  |
|Descpription by Yaml|      Error|     Success|AnomalyConstraint...|          Success|                  |
|Descpription by Yaml|      Error|     Success|AnomalyConstraint...|          Success|                  |
|Descpription by Yaml|      Error| 

In [22]:
df.select("constraint","constraint_status").toPandas()

Unnamed: 0,constraint,constraint_status
0,AnomalyConstraint(Mean(i1c_rating_riscos_Histo...,Success
1,AnomalyConstraint(Mean(i1c_rating_riscos_Histo...,Success
2,AnomalyConstraint(Mean(i1c_rating_riscos_Histo...,Success
3,AnomalyConstraint(Mean(i1c_rating_riscos_Histo...,Success
4,AnomalyConstraint(Mean(i1c_rating_riscos_Histo...,Success
5,AnomalyConstraint(Mean(i1c_rating_riscos_Histo...,Success
6,AnomalyConstraint(Mean(i1c_rating_riscos_Histo...,Success
7,AnomalyConstraint(Mean(i1c_rating_riscos_Histo...,Success
8,AnomalyConstraint(Mean(i1c_rating_riscos_Histo...,Success
9,AnomalyConstraint(Mean(i1d_sexo_Histogram_rati...,Success


### DQ/view/currentResult/successMetrics.parquet

In [23]:
dqView_name = dqView_simple["viewName"]
path = dq_bnr_directory +"/"+ dqView_name +'/currentResult/successMetrics.parquet'
print(path)

df = spark.read.parquet(path)
df.show()

/user/x266727/dataquality_bnr-docs/histogram_anomalyDetec/DQ/dqView_check_moderate/currentResult/successMetrics.parquet
+-------+--------------------+----+--------------------+-------------+----------+----+------------+
| entity|            instance|name|               value| dataset_date|  YY_MM_DD|tags|check_status|
+-------+--------------------+----+--------------------+-------------+----------+----+------------+
| Column|i1c_rating_riscos...|Mean|0.013366761013028666|1643995062814|2022-02-04|  []|     Success|
| Column|i1c_rating_riscos...|Mean| 0.33740586918125587|1643995062814|2022-02-04|  []|     Success|
| Column|i1c_rating_riscos...|Mean|0.028654936186879698|1643995062814|2022-02-04|  []|     Success|
| Column|i1c_rating_riscos...|Mean| 0.03564408019687105|1643995062814|2022-02-04|  []|     Success|
| Column|i1c_rating_riscos...|Mean|5.542540848526054E-6|1643995062814|2022-02-04|  []|     Success|
| Column|i1c_rating_riscos...|Mean| 0.12202642683476578|1643995062814|2022-02-04

In [24]:
df.select("instance","value").toPandas()

Unnamed: 0,instance,value
0,i1c_rating_riscos_Histogram_ratio_4,0.013367
1,i1c_rating_riscos_Histogram_ratio_1,0.337406
2,i1c_rating_riscos_Histogram_ratio_8,0.028655
3,i1c_rating_riscos_Histogram_ratio_9,0.035644
4,i1c_rating_riscos_Histogram_ratio_5,6e-06
5,i1c_rating_riscos_Histogram_ratio_2,0.122026
6,i1c_rating_riscos_Histogram_ratio_0,0.37933
7,i1c_rating_riscos_Histogram_ratio_6,0.02377
8,i1c_rating_riscos_Histogram_ratio_7,0.059797
9,i1d_sexo_Histogram_ratio_M,0.609611


# 3° dia - Simulating failure scenario

Aumentando os valores da distribuicao para demonstrar um caso de uso onde a Deteccao de Anomalias alertaria um Erro.

In [5]:
from pyspark.sql import functions as F

In [6]:
df_input_failure = (df_input
       .withColumn("i1d_sexo",
                   F.when(F.rand() > 0.25, F.lit("M"))
                   .otherwise(F.col("i1d_sexo"))
                  ))

df_input_failure = (df_input_failure
                    .withColumn("i1c_rating_riscos",
                                F.when(F.rand() > 0.25, F.lit(9))
                                .otherwise(F.col("i1d_sexo"))
                               ))

In [15]:
def relativeIncrease(final_value, initial_value):
    increase = ((final_value - initial_value)/initial_value)+1
    increase = round(increase, 5)
    return increase

In [17]:
df_input_M_count=df_input.filter(F.col("i1d_sexo")=='M').count()
print("df_input_M.count(): "+str(df_input_M_count))
df_input_failure_M_count=df_input_failure.filter(F.col("i1d_sexo")=='M').count()
print("df_input_failure_M.count(): "+str(df_input_failure_M_count))

increase = relativeIncrease(df_input_failure_M_count, df_input_M_count)
print("df_input_M Increase of: "+str(increase))
print()
      
i1c_rating_riscos_9_count=df_input.filter(F.col("i1c_rating_riscos")==9).count()
print("i1c_rating_riscos_9.count(): "+str(i1c_rating_riscos_9_count))
i1c_rating_riscos_failure_9_count=df_input_failure.filter(F.col("i1c_rating_riscos")==9).count()
print("i1c_rating_riscos_9.count(): "+str(i1c_rating_riscos_failure_9_count))
      
increase = relativeIncrease(i1c_rating_riscos_failure_9_count, i1c_rating_riscos_9_count)
print("i1c_rating_riscos_9 Increase of: "+str(increase))
print()

df_input_M.count(): 329963
df_input_failure_M.count(): 488606
df_input_M Increase of: 1.48079

i1c_rating_riscos_9.count(): 19293
i1c_rating_riscos_9.count(): 406183
i1c_rating_riscos_9 Increase of: 21.05339



# 3° dia - Simulando Execucao 

In [28]:
dqView_simple = {"viewName" : "dqView_check_moderate",
                 "inputData": df_input_failure,
                 "infraYaml": "yamlFiles/histogram_anomalyDetec/an_anomalyDetec/infrastructure.yaml",
                 "vsYaml": "yamlFiles/histogram_anomalyDetec/an_anomalyDetec/verificationSuite.yaml"}

dq_bnr_directory="/user/x266727/dataquality_bnr-docs/histogram_anomalyDetec/DQ/"

In [29]:
myDq = dqRun.Dq(spark, dq_bnr_directory)

myDq = (myDq
        .addView(dqView_simple, "AnomalyDetection"))
        
dq_run_return = myDq.run()

__Evaluate DataQuality View and add to Main DataQuality Process__
dqView_check_moderate pattern evaluated.


__Running all DataQuality views__
run() dqView_check_moderate:
getPersistentRepository...
applyFiltering...
buildTemporaryRepository...
getDataframe...
getAnomalyDetection...
getOuputDFs...
custom_getAnomalyDetection...
getOuputDFs...
writeTo_persistentRepository...
writeTo_currentResults...
DataQuality process finished.


__Get overall result__
writing to: /user/x266727/dataquality_bnr-docs/histogram_anomalyDetec/DQ/dqView_check_moderate/currentResult/csvResult/dqView_check_moderate_NOK.csv

writing to: /user/x266727/dataquality_bnr-docs/histogram_anomalyDetec/DQ/overall/currentResult/csvResult/overall_NOK.csv


# 3° dia - Analisando Resultados

#### DQ/overall/overall_NOK.csv

In [31]:
path = dq_bnr_directory+'/overall/currentResult/csvResult/overall_NOK.csv'
print(path)
df = spark.read.option("header",True).csv(path)
df.toPandas()

/user/x266727/dataquality_bnr-docs/histogram_anomalyDetec/DQ//overall/currentResult/csvResult/overall_NOK.csv


Unnamed: 0,check,check_level,check_status,constraint,constraint_status,constraint_message,dataset_date,YY_MM_DD,viewName,viewPath
0,Descpription by Yaml,Error,Error,AnomalyConstraint(Mean(i1c_rating_riscos_Histo...,Failure,Value: 0.7500960707080411 does not meet the co...,1643995237486,2022-02-04,dqView_check_moderate,/user/x266727/dataquality_bnr-docs/histogram_a...
1,Descpription by Yaml,Error,Error,AnomalyConstraint(Mean(i1d_sexo_Histogram_rati...,Failure,Value: 0.9027376456764487 does not meet the co...,1643995237486,2022-02-04,dqView_check_moderate,/user/x266727/dataquality_bnr-docs/histogram_a...


In [33]:
import pandas as pd

In [34]:
pd.set_option('display.max_colwidth', None)

display(df.select("check_status","constraint","constraint_message").toPandas())

pd.set_option('display.max_colwidth', 50)

Unnamed: 0,check_status,constraint,constraint_message
0,Error,"AnomalyConstraint(Mean(i1c_rating_riscos_Histogram_ratio_9,None))",Value: 0.7500960707080411 does not meet the constraint requirement!
1,Error,"AnomalyConstraint(Mean(i1d_sexo_Histogram_ratio_M,None))",Value: 0.9027376456764487 does not meet the constraint requirement!


### DQ/view/currentResult/checkResult.parquet

In [35]:
dqView_name = dqView_simple["viewName"]
path = dq_bnr_directory +"/"+ dqView_name +'/currentResult/checkResult.parquet'
print(path)

df = spark.read.parquet(path)
df.drop("YY_MM_DD").drop("dataset_date").show()

/user/x266727/dataquality_bnr-docs/histogram_anomalyDetec/DQ//dqView_check_moderate/currentResult/checkResult.parquet
+--------------------+-----------+------------+--------------------+-----------------+--------------------+
|               check|check_level|check_status|          constraint|constraint_status|  constraint_message|
+--------------------+-----------+------------+--------------------+-----------------+--------------------+
|Descpription by Yaml|      Error|       Error|AnomalyConstraint...|          Failure|Value: 0.75009607...|
|Descpription by Yaml|      Error|       Error|AnomalyConstraint...|          Failure|Value: 0.90273764...|
|Descpription by Yaml|      Error|       Error|AnomalyConstraint...|          Success|                    |
|Descpription by Yaml|      Error|       Error|AnomalyConstraint...|          Success|                    |
|Descpription by Yaml|      Error|       Error|AnomalyConstraint...|          Success|                    |
|Descpription by Y

In [36]:
df.select("constraint","constraint_status","constraint_message").toPandas()

Unnamed: 0,constraint,constraint_status,constraint_message
0,AnomalyConstraint(Mean(i1c_rating_riscos_Histo...,Failure,Value: 0.7500960707080411 does not meet the co...
1,AnomalyConstraint(Mean(i1d_sexo_Histogram_rati...,Failure,Value: 0.9027376456764487 does not meet the co...
2,AnomalyConstraint(Mean(i1c_rating_riscos_Histo...,Success,
3,AnomalyConstraint(Mean(i1c_rating_riscos_Histo...,Success,
4,AnomalyConstraint(Mean(i1c_rating_riscos_Histo...,Success,
5,AnomalyConstraint(Mean(i1d_sexo_Histogram_rati...,Success,
6,AnomalyConstraint(Mean(i1d_sexo_Histogram_rati...,Success,
7,AnomalyConstraint(Size(None)),Success,


### DQ/view/currentResult/successMetrics.parquet

In [37]:
dqView_name = dqView_simple["viewName"]
path = dq_bnr_directory +"/"+ dqView_name +'/currentResult/successMetrics.parquet'
print(path)

df = spark.read.parquet(path)
df.show()

/user/x266727/dataquality_bnr-docs/histogram_anomalyDetec/DQ//dqView_check_moderate/currentResult/successMetrics.parquet
+-------+--------------------+----+--------------------+-------------+----------+----+------------+
| entity|            instance|name|               value| dataset_date|  YY_MM_DD|tags|check_status|
+-------+--------------------+----+--------------------+-------------+----------+----+------------+
| Column|i1c_rating_riscos...|Mean|1.016132488896443...|1643995237486|2022-02-04|  []|       Error|
| Column|i1c_rating_riscos...|Mean|  0.7500960707080411|1643995237486|2022-02-04|  []|       Error|
| Column|i1c_rating_riscos...|Mean| 0.22557771750777802|1643995237486|2022-02-04|  []|       Error|
| Column|i1c_rating_riscos...|Mean|0.024224598535291204|1643995237486|2022-02-04|  []|       Error|
| Column|i1d_sexo_Histogra...|Mean|  0.9027376456764487|1643995237486|2022-02-04|  []|       Error|
| Column|i1d_sexo_Histogra...|Mean|3.916728866291744...|1643995237486|2022-02-0

In [38]:
df.select("instance","value").toPandas()

Unnamed: 0,instance,value
0,i1c_rating_riscos_Histogram_ratio_,0.000102
1,i1c_rating_riscos_Histogram_ratio_9,0.750096
2,i1c_rating_riscos_Histogram_ratio_M,0.225578
3,i1c_rating_riscos_Histogram_ratio_F,0.024225
4,i1d_sexo_Histogram_ratio_M,0.902738
5,i1d_sexo_Histogram_ratio_,0.000392
6,i1d_sexo_Histogram_ratio_F,0.096871
7,*,541268.0


### DQ/view/persistentRepository/successMetrics.parquet

In [39]:
dqView_name = dqView_simple["viewName"]
path = dq_bnr_directory +"/"+ dqView_name +'/persistentRepository/successMetrics.parquet'
print(path)
df = spark.read.parquet(path)

filter_sexo_M = (F.col("instance")=="i1d_sexo_Histogram_ratio_M")
filter_riscos_9 = (F.col("instance")=="i1c_rating_riscos_Histogram_ratio_9")

df.filter(filter_sexo_M | filter_riscos_9).orderBy(F.col("dataset_date")).show(truncate=False)

/user/x266727/dataquality_bnr-docs/histogram_anomalyDetec/DQ//dqView_check_moderate/persistentRepository/successMetrics.parquet
+------+-----------------------------------+----+-------------------+-------------+----------+----+------------+
|entity|instance                           |name|value              |dataset_date |YY_MM_DD  |tags|check_status|
+------+-----------------------------------+----+-------------------+-------------+----------+----+------------+
|Column|i1d_sexo_Histogram_ratio_M         |Mean|0.6096111353340674 |1643994929878|2022-02-04|[]  |Success     |
|Column|i1c_rating_riscos_Histogram_ratio_9|Mean|0.03564408019687105|1643994929878|2022-02-04|[]  |Success     |
|Column|i1d_sexo_Histogram_ratio_M         |Mean|0.6096111353340674 |1643995062814|2022-02-04|[]  |Success     |
|Column|i1c_rating_riscos_Histogram_ratio_9|Mean|0.03564408019687105|1643995062814|2022-02-04|[]  |Success     |
|Column|i1d_sexo_Histogram_ratio_M         |Mean|0.9027376456764487 |164399523748

#### pydeequ shutdown_callback_server()
#### spark.stop()
__Importante!__
Após a execucao dos jobs, garanta que a sessao __spark__ juntamente com o __callback_server__ sejam encerrados, evitando que qualquer processo "fantasma" fique pendurado.<br>
Leia mais sobre __Pydeequ__ e __callback_server__ em: https://github.com/awslabs/python-deequ

In [None]:
spark.sparkContext._gateway.shutdown_callback_server()
spark.stop()