# Data Quality Tutorial - Pyspark

## Prerequisities
You should have worked through:
1. The Data Quality Core training module which is a foundation for all data quality training. This will explain the Data Quality Dimensions (like Uniqueness), and how each measure is calculated.
2. The Python 1 Tutorial (found in this directory). This will equip you to run all of the data quality functions against dataframes, interpret the output, and export the final report
3. The Python 2 Tutorial (found in this directory). This will show you how to store and run data quality rules from a config file.

### Prior coding experience required
* You should know what pandas dataframes are
* You should have some basic python knowledge
* You should be familiar with Spark

## Aims of Pyspark Tutorial
* To be able to run data quality rules in Spark (and understand how this is done)
* To be able to run data quality rules on nested data in Spark
    * No open source data quality package supports nested data, and this is one of the primary reasons for writing our own package


In [None]:
from datetime import datetime

import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import (
    ArrayType,
    IntegerType,
    StringType,
    StructField,
    StructType,
)

from gchq_data_quality import (
    ConsistencyRule,
    DataQualityConfig,
    TimelinessStaticRule,
    UniquenessRule,
)

## - SPARK IMPORTS - ##
from gchq_data_quality.spark.dataframe_operations import flatten_spark

Let's re-use the same dataframe we used in Python 1, but we will make it a spark dataframe

In [None]:
df = pd.DataFrame(
    {
        "id": [1, 2, 3, 3, 5],  # 4 /5 unique
        "name": ["John", "Jane", "Dave", None, "Missing"],  # 1 null value
        "age": [30, 25, 102, 15, -5],  # a negative age
        "email": [
            "john@example.com",
            "jane@example.com",
            "dave@example",
            "test@test.com",
            "alice@example.com",
        ],  # invalid 3rd email
        "category": ["A", "B", "C", "D", "X"],
        "score": [
            10,
            20,
            30,
            40,
            -1,
        ],  # missing scores are defined as -1
        "date": [
            datetime(2023, 1, 1),
            datetime(2023, 2, 1),
            datetime(2023, 3, 1),
            datetime(2021, 1, 1),  # one date too old
            datetime(2023, 5, 1),
        ],
    }
)

df.head()

In [None]:
spark = SparkSession.builder.appName("My App").getOrCreate()

dfs = spark.createDataFrame(df)

In [None]:
dfs.show()

In [None]:
# Let's check it has parsed a sensible schema
print(dfs.printSchema())

# Running a basic Data Quality rule in Spark
The approach is identical to using a dataframe. We
1. Create our rule
2. Evaluate against the spark dataframe

Our code will determine what type of dataframe you have submitted.

In [None]:
uniqueness_rule = UniquenessRule(field="id")
dq_result = uniqueness_rule.evaluate(dfs)
dq_result.model_dump()

### Reporting failed samples - differences in Spark

Spark Dataframes are unordered, so recording the records_failed_ids would be misleading, so these are not provided (you will notice above that they are None)

In Spark, due to limitations on having strictly defined schemas, we ouput string versions of failed records (you will notice the failed date samples in the cell below are note a datetime object - as they are in pandas - but the string equivalent)

In [None]:
timeliness_static_rule = TimelinessStaticRule(
    field="date", start_date="2023-01-01", end_date=datetime(2023, 6, 1)
)
timeliness_static_result = timeliness_static_rule.evaluate(dfs)
timeliness_static_result.to_dict()

You can also execute entire DataQualityConfig objects against the spark dataframe in the same way as pandas dataframes.

In [None]:
dq_config = DataQualityConfig.from_yaml(
    file_paths="resources/SOLUTION_rules_with_regex.yaml",
    regex_yaml_path="resources/regex_patterns.yaml",
)

In [None]:
dq_report = dq_config.execute(dfs)

In [None]:
dq_report.to_dataframe()

### Things to Note:
We do not return invalid_row_numbers (these values will remain empty), as spark dataframes are inherently unordered, and the values will not make much sense due to how we partition the data (see below)

## What is happening within Spark
The way we execute data quality rules in spark is as follows:
1. Spark will partition the data into lots of small pandas dataframes and run the same code that we use on pandas against each dataframe individually.
    * We use the 'mapInPandas' method
    * The size of the partions is decided by Spark, but you can overwride with the num_groups argument.
    
2. Each small dataframe will return a data quality report. These reports are aggregated, then returned.
    * The recorded 'measurement_time' each partioned dataframe was measured will be slightly different (i.e. the time.now()), unless you have overridden this value, and we pick the latest measurement_time (the time the last Spark worker looked at the last bit of data)
    * failed records are joined up, but the final report will limit to whatever is set globally (FAILED_RECORDS_IN_SAMPLE)

3. Uniqueness rules are handled separately using native Spark code (as we can't check for uniqueness in samples of the data, we need knowledge of every value at once) - i.e. mapInPandas will not work for measuring uniqueness.
4. The reason we use mapInPandas is because we can:
    * re-use the majority of our codebase which makes it easier to maintain.
    * Take advantage of all the flexibility in the pandas eval syntax for complex consistency rules

In [None]:
# overriding Spark's default partition size - here we split into 2 partitions
dfs_2 = dfs.repartition(2)
dq_report = dq_config.execute(dfs_2)
dq_report.to_dataframe()

# Measuring Nested Data
A lot of our data is nested. Many other data quality packages explicitly do not deal with nested data. This was one of the main motivations for writing our own package

Let's get some simple nested data to work with

### Example nested data - Pet Shop
We will have data relating to customers, who own zero or more pets. Each pet can have one or more appointments

In [None]:
data = [
    {
        "id": 1,
        "customers": {
            "name": "John",
            "age": 30,
            "pets": [
                {
                    "name": "Fido",
                    "appointments": [
                        {"date": "2022-01-01", "comment": "Fido First appointment"},
                        {"date": "2022-01-02", "comment": "Fido Second appointment"},
                    ],
                },
                {
                    "name": "Whiskers",
                    "appointments": [
                        {"date": "2022-02-03", "comment": "Whiskers First appointment"},
                        {
                            "date": "2022-02-04",
                            "comment": "Whiskers Second appointment",
                        },
                    ],
                },
            ],
        },
    },
    {
        "id": 2,
        "customers": {
            "name": "Jane",
            "age": 25,
            "pets": [{"name": "Rex", "appointments": []}],
        },
    },
    {
        "id": 3,
        "customers": {
            "name": "Mr No Pets",
            "age": 102,
            "pets": [{"name": None, "appointments": []}],
        },
    },
    {
        "id": 4,
        "customers": {
            "name": "Mrs Missing Pets",
            "age": 15,
            "pets": [
                {"name": "missing", "appointments": [{"date": None, "comment": "none"}]}
            ],
        },
    },
]

schema = StructType(
    [
        StructField("id", IntegerType(), True),
        StructField(
            "customers",
            StructType(
                [
                    StructField("name", StringType(), True),
                    StructField("age", IntegerType(), True),  # <-- added age to schema
                    StructField(
                        "pets",
                        ArrayType(
                            StructType(
                                [
                                    StructField("name", StringType(), True),
                                    StructField(
                                        "appointments",
                                        ArrayType(
                                            StructType(
                                                [
                                                    StructField(
                                                        "date", StringType(), True
                                                    ),
                                                    StructField(
                                                        "comment", StringType(), True
                                                    ),
                                                ]
                                            )
                                        ),
                                        True,
                                    ),
                                ]
                            )
                        ),
                        True,
                    ),
                ]
            ),
            True,
        ),
    ]
)
df_pets = spark.createDataFrame(data, schema=schema)

In [None]:
df_pets.printSchema()

In [None]:
# At a high level we have two columns (id, and customers)
# We need to bury deep into the customers column to pull out different values to test DQ rules against
df_pets.show()

### How we handle nested data
In order for our existing data quality code to work with nested data, we first need to flatten the dataframe

This is all done for you, but it's worthwhile seeing how this works.

* under-the-hood, we use the flatten_spark function
* I suggest you use this whilst creating your data quality rules, to check the dataframe is being flattened in the way you intend.
    * Nested data can be quite complicated, especially if you are exploding multiple levels across multiple columns
* Once flattened, we measure the data quality rules (it's treated like an ordinary dataframe)

### How to refer to nested values
We use notation similar to JSON path strings, if you are familiar with those.
1. To access all pet names for our customers, we want to 'explode' the array of pets and pull out the name:

```yaml
field: customers.pets[*].name
```

The [*] value will return details for every pet

2. Sometimes (e.g. for completeness measures), we just want to know if we have at least one pet name

```yaml
field: customers.pets[].name
```

This will pull the first non-null pet name for each customer (unless all of their pet names are null, in which case it will return null)

3. We need to keep track of new column names as the dataframe is being sequentially 'exploded' and flattened. We therefore rename the columns of the newly created dataframe:
    1. full-stops replaced with underscores
    2. [*] replaced with _all (to signify we are extracting all values)
    3. [] replaced with _first (to signify we pick the first non-null value)
    ```yaml
    customers.pets[*].name > customers_pets_all_name
    customers.pets[].name > customers_pets_first_name
4. let's use flatten_spark and see this in practice

In [None]:
# Note how we will get two rows for John, as he has two pets.
df_flat = flatten_spark(
    df_pets, flatten_cols=["id", "customers.name", "customers.pets[*].name"]
)
df_flat.show()

In [None]:
# If we pull just the first pet for each customer, we expect one row per customer
# Note that as Spark is unordered, you may get a different 'first' pet each time you run this (likely for large data)
df_flat = flatten_spark(
    df_pets, flatten_cols=["id", "customers.name", "customers.pets[].name"]
)
df_flat.show()

### Writing Data Quality Rules for Nested Data
In your YAML configuration file:

1. Just refer to the specific field using the nested format:
```yaml
field: customers.pets[].name
function: completeness
```

2. For consistency rules, you MUST use backticks around EVERY column name (as this is how they are extracted)
```yaml
field: customers.pets[].name
function: consistency
expression: "`customers.age` > 18"
```

In [None]:
age_rule = ConsistencyRule(
    field="customers.age", expression="`customers.age` > 18"
)  # Note: ESSENTIAL use of backticks around the column expression customers.age

# Before running the data quality rules on nested data, it's a good idea to check the dataframe being sent
df_flat = flatten_spark(df_pets, flatten_cols=["customers.age", "id"])
print(
    "====== This is the DataFrame that will be sent (with the id field added for clarity)======"
)
print("====== We create this with the 'flatten_spark' function ======")
df_flat.show()
print("===================")
nested_result = age_rule.evaluate(df_pets)
print("Data Quality Report DataFrame")
nested_result.to_dict()

In [None]:
# One of our ages is less than 18 - note the change in column name (underscore between customers and age):
nested_result.records_failed_sample

# Running from a Config File
We provide an example config file (`nested_data_rules.yaml`) to show you a range of rules you can apply

You can find this file in the tutorial directory (same location as this notebook):

```yaml
source_data: Nested Pet Shop Data
measurement_time: 2025-01-01
rules:
- field: customers.age
  function: validity_numerical_range
  min_value: 18
  max_value: 120
- field: customers.pets[*].appointments[*].date
  rule_description: All dates are no more than a year in the future from 1st Jan 2022
  function: timeliness_static
  start_date: 2022-01-01
  end_date: 2023-01-01
- field: customers.pets[].name
  function: completeness
- field: customers.name
  function: consistency
  rule_description: If the customer is a child they can't be a Mr or a Mrs
  expression:
    if: '`customers.age` < 18'
    then: '~`customers.name`.str.startswith("Mr")'

```

In [None]:
nested_config2 = DataQualityConfig.from_yaml(
    "resources/nested_data_rules.yaml", regex_yaml_path="resources/regex_patterns.yaml"
)

In [None]:
nested_config2.rules

In [None]:
dq_pets_nested_report = nested_config2.execute(df_pets)

In [None]:
dq_pets_nested_report.to_dataframe()

In [None]:
spark.stop()

### Spark GOTCHAs

A few limitations to note:

**Auto YAML creation**

Because of the column name changes (customers[*].age > customers_all_age) you can't re-create the exact same DataQualityConfig rules from the data quality report (the column names have now changed) but this is fairly quick to remedy in a code editor with find and replace.

```python
config = DataQualityConfig.from_report(DQ_Report)
```
i.e. replace '_all' with [*], '_first' with [], and replace single '_' with '.' in the resulting config to get back to the original field names.

**Consistency Rules**
Be careful with the expressions you run in consistency checks. If you rely on dataframe-wide statistics for an expressions, this will be unreliable if the dataframe is partitioned. e.g. `col1 <= other_col.mean()` - the value of other_col.mean() will vary depending on how the dataframe has been partitioned. If running this check is essential, you will have to push the data to a single spark worker using  `df_spark.repartition(1)` or pre-process the data so the mean value is in it's own column (so the same value goes to every partition)

**Timezones**

 I've found Spark doesn't like timezone-naive datetimes, and will stamp a timezone on a date column (depending on how you've configured Spark, this could be local time, and so not always UTC). I suggest you control this explicitly in your source data, and convert everything to UTC, so that the time rules work as intended.

**Approximations**

Especially with small bits of test data being split across parititions. Spark may introduce some rounding errors - insignicant from a data quality measure point of view, but odd to see. Our unit testing has to account for this by checking values are equal with an error of 1e-4

In [None]:
DataQualityConfig.from_report(dq_pets_nested_report).model_dump()

# Note how the 'field' name in the output below has the new column names such as 'customers_age'

# Production!

Now you can deploy at scale against Spark dataframes you should be almost ready for production :)