In [None]:
# Install a venv and activate it for isolation
# !pyenv versions
# !pyenv virtualenv 3.8.17 pandera-3.8.17
# !pyenv activate pandera-3.8.17

In [None]:
!pip install pandas==1.5.3
!pip install pyspark==3.3.2
!pip install pandera==0.18.0
!pip install hypothesis==6.97.5
!pip install pyarrow==15.0.0


In [None]:

!pip install pyyaml
!pip install black
!pip install frictionless


In [None]:
import pandas as pd
import pandera as pa



# DataFrameSchema PARAMETERS
- **columns** (mapping of column names and column schema component.) – a dict where keys are column names and values are Column objects specifying the datatypes and properties of a particular column.
- **checks** (Optional[CheckList]) – dataframe-wide checks.
- **index** – specify the datatypes and properties of the index.
- **dtype** (PandasDtypeInputTypes) – datatype of the dataframe. This overrides the data types specified in any of the columns. If a string is specified, then assumes one of the valid pandas string values: http://pandas.pydata.org/pandas-docs/stable/basics.html#dtypes.
- **coerce** (bool) – whether or not to coerce all of the columns on validation. This overrides any coerce setting at the column or index level. This has no effect on columns where dtype=None.
- **strict** (StrictType) – ensure that all and only the columns defined in the schema are present in the dataframe. If set to ‘filter’, only the columns in the schema will be passed to the validated dataframe. If set to filter and columns defined in the schema are not present in the dataframe, will throw an error.
- **name** (Optional[str]) – name of the schema.
- **ordered** (bool) – whether or not to validate the columns order.
- **unique** (Optional[Union[str, List[str]]]) – a list of columns that should be jointly unique.
- **report_duplicates** (UniqueSettings) – how to report unique errors - exclude_first: report all duplicates except first occurence - exclude_last: report all duplicates except last occurence - all: (default) report all duplicates
- **unique_column_names** (bool) – whether or not column names must be unique.
- **add_missing_columns** (bool) – add missing column names with either default value, if specified in column schema, or NaN if column is nullable.
- **title** (Optional[str]) – A human-readable label for the schema.
- **description** (Optional[str]) – An arbitrary textual description of the schema.
- **metadata** (Optional[dict]) – An optional key-value data.
- **drop_invalid_rows** (bool) – if True, drop invalid rows on validation.

In [4]:
# data to validate
df = pd.DataFrame({
    "column1": [1, 4, 0, 10, 9],
    "column2": [-1.3, -1.4, -2.9, -10.1, -20.4],
    "column3": ["value_1", "value_2", "value_3", "value_2", "value_1"],
})

# define schema with inline checks
schema = pa.DataFrameSchema({
    "column1": pa.Column(int, checks=pa.Check.le(10)),
    "column2": pa.Column(float, checks=pa.Check.lt(-1.2)),
    "column3": pa.Column(str, checks=[
        pa.Check.str_startswith("value_"),
        # define custom checks as functions that take a series as input and
        # outputs a boolean or boolean Series
        pa.Check(lambda s: s.str.split("_", expand=True).shape[1] == 2)
    ]),
})

# This works, because the checks pass the schema validation for all columns
validated_df = schema(df)
print(validated_df)


   column1  column2  column3
0        1     -1.3  value_1
1        4     -1.4  value_2
2        0     -2.9  value_3
3       10    -10.1  value_2
4        9    -20.4  value_1


In [5]:

# This will raise a SchemaError, because the checks fail for column2 and 3
df_invalid = pd.DataFrame({
    "column1": [1, 4, 0, 10, 9],
    "column2": [6, -1.4, -2.9, 10.1, -20.4],
    "column3": ["value_1", "value_2", "value_3", "value_2", "value_1"],
})

# Catching exception here to make the output more readable
try:
    schema(df_invalid)
except pa.errors.SchemaError as e:
    print(e)


<Schema Column(name=column2, type=DataType(float64))> failed element-wise validator 0:
<Check less_than: less_than(-1.2)>
failure cases:
   index  failure_case
0      0           6.0
1      3          10.1


In [None]:
#These schemas, and their checks can be serialized to and from JSON
#https://pandera.readthedocs.io/en/stable/schema_inference.html?highlight=schema%20json

schema_from_json = pa.DataFrameSchema.from_json("schemas/schema.json")
schema_from_json

# Add a new column with checks, to the schema
schema_from_json_fourth_col = schema_from_json.add_columns({
    "column4": pa.Column(int, checks=pa.Check.ge(0))
})

# This could serve as a powerful way to dynamically manage read and write schemas in a data pipeline.
# More simply, you could just use this as a validation tool for your dataframee transformations.
schema_from_json_fourth_col.to_json("schema_with_column4.json")

# You could then validate the dataframe with the new schema, and write on validation success only.
schema_from_json_fourth_col.validate(df)
# The above will fail, as the dataframe does not have the column4


# YAML Schema Example with defined reader and writer schemas

- In this example, we will set a reader and a writer schema, defined in YAML files.
- We validate the pre-defined schemas against the dataframes before reading and writing.
- This is a powerful way to ensure that the data is always in the expected format
    and sudden changes in the data wont break the pipeline.

In this fictional example, we are collecting data from colleagues at a company, and we want to ensure:

- Each employee has a unique ID and can only appear once in the dataset
- Their age is over 18 (they've been hired legally)

This questionnare was to check who has the longest tenure at the company as a % of their total life, as we have no record of that.
During the process, we will materialise a new column, "tenure_percentage" which is the tenure at the company as a % of their total life.
We don't want cheating! So we will check that the tenure_percentage is less than 100%.

In my fictional pipeline, I would have alerting set up to handle the failure of the data validation checks.


In [None]:
reader_schema_from_yaml = pa.DataFrameSchema.from_yaml("schemas/reader_schema.yaml")
reader_schema_from_yaml

input_df = pd.DataFrame({
    "name": ['Alice', 'Bob', 'Charlie', 'Dennis', 'Edith'],
    "age": [25, 32, 29, 19, 22],
    "tenure": [3, 5, 2, 1, 4]
})

reader_schema_from_yaml.validate(input_df)

#materialise the tenure_percentage column
input_df["tenure_percentage"] = (input_df["tenure"] / input_df["age"] * 100)
input_df

# To ensure that we are writing the expected data, we can validate the dataframe against the writer schema
writer_schema_from_yaml = pa.DataFrameSchema.from_yaml("schemas/writer_schema.yaml")

writer_schema_from_yaml.validate(input_df)

input_df.to_csv("employee_tenure.csv", index=False)

# Edith wins, with 18.18% of her life spent at the company.

# Data Synthesis

https://pandera.readthedocs.io/en/stable/data_synthesis_strategies.html

pandera provides a utility for generating synthetic data purely from pandera schema or schema component objects. Under the hood, the schema metadata is collected to create a data-generating strategy using hypothesis, which is a property-based testing library.

This could provide very valuable for testing and development purposes.

In [None]:
# Load that JSON schema into a DataFrameSchema object again

schema_from_json = pa.DataFrameSchema.from_json("schemas/schema.json")

# generate a synthetic dataframe that conforms to the schema

synthetic_df = schema_from_json.example(3)
synthetic_df

# Spark Integration

At FundingCircle, we often do larger data processing tasks in Spark, and we would like to validate the data in the same way as we do in Pandas, using Pyspark Pandas.

The same concepts as above all apply, but the syntax is slightly different.

In [None]:
## Ensure Java is installed, if not, install it for this local demo.
!java -version

# openjdk version "1.8.0_362"
# OpenJDK Runtime Environment (Zulu 8.68.0.21-CA-macos-aarch64) (build 1.8.0_362-b09)
# OpenJDK 64-Bit Server VM (Zulu 8.68.0.21-CA-macos-aarch64) (build 25.362-b09, mixed mode)

# Ensure SPARK version matches the pyspark version installed
!pyspark

# Welcome to
#       ____              __
#      / __/__  ___ _____/ /__
#     _\ \/ _ \/ _ `/ __/  '_/
#    /__ / .__/\_,_/_/ /_/\_\   version 3.3.2
#       /_/

In [None]:
import pyspark.pandas as ps
import pandas as pd
import pandera as pa

from pandera.typing.pyspark import DataFrame, Series

# You can define your schema using the pandera DataFrameModel class using traditional types.
class Schema(pa.DataFrameModel):
    state: Series[str] = pa.Field(in_set=["FL", "CA"])
    city: Series[str] = pa.Field(allow_duplicates=False)
    price: Series[int] = pa.Field(in_range={"min_value": 5, "max_value": 20})

# Schema.to_yaml("schemas/pyspark_schema.yaml") if you want to persist the schema to YAML.

# create a pyspark.pandas dataframe that's validated on object initialization.
# This is a more efficient, inline process than using the .validate method on a dataframe.
df = DataFrame[Schema](
    {
        'state': ['FL','FL','FL','CA','CA','CA'],
        'city': [
            'Orlando',
            'Miami',
            'Tampa',
            'San Francisco',
            'Los Angeles',
            'San Diego',
        ],
        'price': [5, 12, 10, 16, 20, 18], # <-- Change me to see the validation error
    }
)

print(df)

# Import a Schema from a YAML for DataFrame Validation

Like the previous non-spark example, we can import a schema from a YAML file for DataFrame validation. We can do this on read and write to ensure the data is always in the expected format and following our business or functional rules.

We can persist these schemas in version control and use them across different parts of the pipeline, ensuring consistency and reducing the risk of errors.

In [25]:
import pyspark.pandas as ps
import pandas as pd
import pandera as pa

from pandera.typing.pyspark import DataFrame

schema_from_yaml = pa.DataFrameSchema.from_yaml("schemas/pyspark_schema.yaml")

# create a pyspark.pandas dataframe that's validated on object initialization
df = DataFrame(
    {
        'state': ['FL','FL','FL','CA','CA','CA'],
        'city': [
            'Orlando',
            'Miami',
            'Tampa',
            'San Francisco',
            'Los Angeles',
            'San Diego',
        ],
        'price': [5, 12, 10, 16, 20, 18], # <-- Change me to see the validation error
    }
)

# validate the dataframe - catching exception for clarity
try:
    schema_from_yaml.validate(df)
except pa.errors.SchemaError as e:
    print(e)

print(df)
type(df)


  fields = [
  for column, series in pdf.iteritems():
  fields = [
  for column, series in pdf.iteritems():
  fields = [
  for column, series in pdf.iteritems():


  state           city  price
0    FL        Orlando      5
1    FL          Miami     12
2    FL          Tampa     10
3    CA  San Francisco     16
4    CA    Los Angeles     20
5    CA      San Diego     18


pandera.typing.pyspark.DataFrame

# Spark DataFrame data synthesis

We can also use pandera to generate synthetic data for Spark DataFrames. This is a very powerful tool for testing and development purposes.


In [26]:
# Create a dataframe in pandas for spark and synthesize data using a schema from pandera

import pyspark.pandas as ps
import pandas as pd
import pandera as pa

from pandera.typing.pyspark import DataFrame

# create a pyspark.pandas dataframe that's validated on object initialization

schema_from_yaml = pa.DataFrameSchema.from_yaml("schemas/pyspark_schema.yaml")

... todo


  fields = [
  for column, series in pdf.iteritems():


        state   city  price
0       åhÊ^  åhÊ^      7
1  Î\tËÑ򥺻{ L     󖸉     14
2          [d  ùÿ+     10


pandas.core.frame.DataFrame