# Rony Data Quality - Analyzer example

In [1]:
import pydeequ

from pyspark import SparkFiles
from pyspark.sql import SparkSession

from rony.data_quality import Analyzer, DataQuality

Please set env variable SPARK_VERSION


### Create a SparkSession with PyDeequ and Delta jars from maven

In [2]:
spark = DataQuality.create_deequ_delta_SparkSession()

### Download *titanic* data from the web

In [3]:
url = "https://raw.githubusercontent.com/neylsoncrepalde/titanic_data_with_semicolon/main/titanic.csv"
spark.sparkContext.addFile(url)
df = (
    spark
    .read
    .csv("file://" + SparkFiles.get("titanic.csv"), header=True, inferSchema=True, sep=";")
)

In [4]:
df.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

### Write job configurations yaml file

In [5]:
%%writefile analyzer_configs.yaml
columns:
    PassengerId: [Completeness]
    Age: [Completeness, Mean, StandardDeviation, Minimum, Maximum, Sum, Entropy]
    Sex: [Completeness, ApproxCountDistinct, Distinctness]
    Fare: [Completeness, Mean, StandardDeviation]
    Pclass: [DataType]
    Survived: [Histogram]
    Name: [MaxLength, MinLength]
metrics:
    Correlation: 
        - [Fare, Age]
        - [Fare, Survived]
    Compliance: 
        - [Age, "Age>40.2"]
    PatternMatch: 
        - [Name, "M(r|rs|iss)."]
    ApproxQuantiles: 
        - [Age, '0.5', '0.25', '0.75']
        - [Fare, '0.5', '0.25', '0.75']
    Uniqueness:
        - [PassengerId]
        - [Name,Sex]
        - [Ticket]
    UniqueValueRatio:
        - [PassengerId]
        - [Name,Sex]

Writing analyzer_configs.yaml


If you have experience with PyDeequ library, you can pass an *AnalysisRunner* statement as a string. For now, it must be a "one liner" expression:

In [6]:
string_expression = 'AnalysisRunner(spark).onData(df).addAnalyzer(Size()).addAnalyzer(Completeness("Age")).addAnalyzer(Mean("Age")).addAnalyzer(StandardDeviation("Age")).addAnalyzer(Minimum("Age")).addAnalyzer(Maximum("Age")).addAnalyzer(Completeness("Sex")).addAnalyzer(ApproxCountDistinct("Sex")).addAnalyzer(ApproxQuantile("Age", 0.5)).addAnalyzer(UniqueValueRatio(["PassengerId", "Sex"])).run()'

### Instantiate Analyzer Class

In [7]:
analyzer = Analyzer(spark)

### Run the analyzer job and show results

In [8]:
res_from_file = analyzer.run(df, "analyzer_configs.yaml")

In [9]:
res_from_file.show(truncate=False, n=50)

+-----------+-------------+--------------------------+--------------------+-------------------+
|entity     |instance     |name                      |value               |dt_update          |
+-----------+-------------+--------------------------+--------------------+-------------------+
|Column     |Age          |ApproxQuantiles-0.25      |20.0                |2022-07-06-23-24-56|
|Column     |Age          |ApproxQuantiles-0.5       |28.0                |2022-07-06-23-24-56|
|Column     |Age          |ApproxQuantiles-0.75      |38.0                |2022-07-06-23-24-56|
|Column     |Age          |Completeness              |0.8013468013468014  |2022-07-06-23-24-56|
|Column     |Age          |Compliance                |0.16835016835016836 |2022-07-06-23-24-56|
|Column     |Age          |Entropy                   |4.045611490075093   |2022-07-06-23-24-56|
|Column     |Age          |Maximum                   |80.0                |2022-07-06-23-24-56|
|Column     |Age          |Mean         

### Run analyzer job from string expression

In [10]:
res_from_expression = analyzer.run(df, string_expression)

In [11]:
res_from_expression.show(truncate=False)

+-----------+---------------+-------------------+------------------+-------------------+
|entity     |instance       |name               |value             |dt_update          |
+-----------+---------------+-------------------+------------------+-------------------+
|Column     |Age            |ApproxQuantile-0.5 |28.0              |2022-07-06-23-24-59|
|Column     |Age            |Completeness       |0.8013468013468014|2022-07-06-23-24-59|
|Column     |Age            |Maximum            |80.0              |2022-07-06-23-24-59|
|Column     |Age            |Mean               |29.69911764705882 |2022-07-06-23-24-59|
|Column     |Age            |Minimum            |0.42              |2022-07-06-23-24-59|
|Column     |Age            |StandardDeviation  |14.51632115081731 |2022-07-06-23-24-59|
|Column     |Sex            |ApproxCountDistinct|2.0               |2022-07-06-23-24-59|
|Column     |Sex            |Completeness       |1.0               |2022-07-06-23-24-59|
|Dataset    |*       

### Write dataframe output as a Delta Table

In [12]:
analyzer.write_output(res_from_file, "./dataqualityoutput", delta=True, mode="append")

Check if the results were correctly written.

In [13]:
spark.read.parquet("dataqualityoutput/").show(50)

+-----------+-------------+--------------------+--------------------+-------------------+
|     entity|     instance|                name|               value|          dt_update|
+-----------+-------------+--------------------+--------------------+-------------------+
|     Column|          Age|ApproxQuantiles-0.25|                20.0|2022-07-06-23-24-56|
|     Column|          Age| ApproxQuantiles-0.5|                28.0|2022-07-06-23-24-56|
|     Column|          Age|ApproxQuantiles-0.75|                38.0|2022-07-06-23-24-56|
|     Column|          Age|        Completeness|  0.8013468013468014|2022-07-06-23-24-56|
|     Column|          Age|          Compliance| 0.16835016835016836|2022-07-06-23-24-56|
|     Column|          Age|             Entropy|   4.045611490075093|2022-07-06-23-24-56|
|     Column|          Age|             Maximum|                80.0|2022-07-06-23-24-56|
|     Column|          Age|                Mean|   29.69911764705882|2022-07-06-23-24-56|
|     Colu