In [8]:
# Downloading the contents of our online repo, 
# so that we have access to all the code and our dataset!import os
import os

# Code specifically for running on Google Colab.
# We don't want to run this if the notebook is run locally.
if ':/tools/google-cloud-sdk/bin' in os.environ["PATH"]:

  repo_folder = os.path.join('/', 'content', 
                            'great-expectation-workshop-pydata')
  
  if not os.path.isdir(repo_folder):
    !git clone https://github.com/heineken-advanced-analytics/great-expectation-workshop-pydata.git
    print("Successfully cloned repository")
  else:
    print("Repository already cloned!")

## PART 1 ##

In [9]:
import logging
import pathlib
from datetime import datetime
from typing import Dict, List

import pandas as pd

import great_expectations as ge
from great_expectations.dataset import MetaPandasDataset, PandasDataset

In [10]:
# %% VALIDATION CHECK BEFORE RUNNING PIPELINE
# If the validation of the data file succeeds, the pipeline continues.
def continue_pipeline(validation_result: bool):
    if validation_result:
        print("Pipeline continues.")
    else:
        print("Pipeline breaks.")

# %% PARSE SINGLE FILE
# We start by loading the file that we want to check.
filepath = "./data/weather_brasil_201301.csv"
weather_data_df = pd.read_csv(filepath)

# %%  CHECK HEAD:
weather_data_df.head()

# %% CREATE A FUNCTION TO TEST THE DATA
# # In order to test the file, we would need to create some functionality.
# We could start by doing something like below, if we want to ensure that all of
# our columns are present in specific order:

Unnamed: 0,id,elevation,lat,lon,code,city,timestamp,precip,air_pres,solar_rad,temp,rel_humid,wind_speed
0,178,237.0,-6.835777,-38.311583,A333,São Gonçalo,2013-01-01 00:00:00,,983.1,,30.1,44.0,2.9
1,178,237.0,-6.835777,-38.311583,A333,São Gonçalo,2013-01-01 01:00:00,,983.4,,30.0,43.0,1.5
2,178,237.0,-6.835777,-38.311583,A333,São Gonçalo,2013-01-01 02:00:00,,983.5,,30.5,40.0,1.9
3,178,237.0,-6.835777,-38.311583,A333,São Gonçalo,2013-01-01 03:00:00,,983.9,,29.8,45.0,2.5
4,178,237.0,-6.835777,-38.311583,A333,São Gonçalo,2013-01-01 04:00:00,,983.8,,28.7,54.0,3.0


In [11]:
def has_ordered_required_columns(df: pd.DataFrame, schema: List[Dict]) -> pd.DataFrame:
    """
    Check if the configured required columns are present in the DataFrame.

    Parameters
    ----------
    df
        DataFrame to check.
    schema
        Dictionary with expected dataframe schema.

    Returns
    -------
        Boolean indicating whether the actual columns correspond to the expected columns.
    """
    required_columns = [schema_item["col_name"] for schema_item in schema]
    print(
        f"Checking whether your file contains required columns:\n{required_columns}\n\n"
    )
    return df.columns.tolist() == required_columns


# Schema structure taken from on of our older projects.
_schema = [
    {"col_name": "id", "dtype": "int", "nullable": False},
    {"col_name": "elevation", "dtype": "float", "nullable": True},
    {"col_name": "lat", "dtype": "float", "nullable": True},
    {"col_name": "lon", "dtype": "float", "nullable": True},
    {"col_name": "code", "dtype": "str", "nullable": True},
    {"col_name": "city", "dtype": "str", "nullable": True},
    {"col_name": "timestamp", "dtype": "datetime", "nullable": True},
    {"col_name": "precip", "dtype": "float", "nullable": True},
    {"col_name": "air_pres", "float": "datetime", "nullable": True},
    {"col_name": "solar_rad", "dtype": "float", "nullable": True},
    {"col_name": "temp", "dtype": "float", "nullable": True},
    {"col_name": "rel_humid", "dtype": "float", "nullable": True},
    {"col_name": "wind_speed", "dtype": "float", "nullable": True},
]

In [12]:
# Our result:
validation_result = has_ordered_required_columns(weather_data_df, _schema)

continue_pipeline(validation_result)

# %% RUN VALIDATION
# If we delete a column, our validation will fail and pipeline will break:
weather_data_df_no_windspeed = weather_data_df.drop(
    columns="wind_speed"
)
validation_result = has_ordered_required_columns(
    weather_data_df_no_windspeed, _schema
)

continue_pipeline(validation_result)

Checking whether your file contains required columns:
['id', 'elevation', 'lat', 'lon', 'code', 'city', 'timestamp', 'precip', 'air_pres', 'solar_rad', 'temp', 'rel_humid', 'wind_speed']


Pipeline continues.
Checking whether your file contains required columns:
['id', 'elevation', 'lat', 'lon', 'code', 'city', 'timestamp', 'precip', 'air_pres', 'solar_rad', 'temp', 'rel_humid', 'wind_speed']


Pipeline breaks.


This is a point where we encounter the need for some tedious work if we continue.
If we want to achieve a good coverage, we will need to create quite some
additional code. This code also needs to be maintained,
documentation needs to be created and maintained, etc. This is quite some overhead and often
represents a significant problem. Also, data sources might change overtime and all this
needs to be documented.

Great expectations helps us to remedy such painpoints to the great extent (pun intended). We will start
small and simple - using only a subset of GE functionality.
Then we will start building on top of that and in the end come up with
a more elaborate validation scheme.

In [29]:
# %% REPLACING PANDAS DATAFRAME WITH GREAT EXPECTATIONS BATCH
weather_data_batch = ge.read_csv(filepath)
print(type(weather_data_batch))

# %% SIMILARITY WITH PANDAS
weather_data_batch.head()

# %% CONTAINS ADDITIONAL FUNCTIONALITY:
# If the audience remembers, previously we had a schema that we used
# to check for the presence of particular columns.
# Let's extract column names for schema and see what can we do with
required_columns = [schema_item["col_name"] for schema_item in _schema]

# we can now set our first expectations, using methods associated with
# our pandas dataframe on validation steroids:
weather_data_batch.expect_table_columns_to_match_ordered_list(
    required_columns
)
weather_data_batch.expect_column_values_to_be_between(
    column="temp", min_value=-20, max_value=50
)
weather_data_batch.expect_column_values_to_be_between(
    column="lat", min_value=-90, max_value=90
)
weather_data_batch.expect_column_values_to_be_between(
    column="lon", min_value=-180, max_value=180
)

# And simply validate the batch against the expectations.
# It is as easy as that.
validation_result = weather_data_batch.validate()
validation_result

<class 'great_expectations.dataset.pandas_dataset.PandasDataset'>


{
  "success": true,
  "meta": {
    "great_expectations.__version__": "0.11.1",
    "expectation_suite_name": "default",
    "run_id": {
      "run_name": null,
      "run_time": "2020-06-08T12:44:55.202826+00:00"
    },
    "batch_kwargs": {
      "ge_batch_id": "da8c7332-a985-11ea-b0b8-acde48001122"
    },
    "batch_markers": {},
    "batch_parameters": {},
    "validation_time": "20200608T124455.202799Z"
  },
  "evaluation_parameters": {},
  "statistics": {
    "evaluated_expectations": 4,
    "successful_expectations": 4,
    "unsuccessful_expectations": 0,
    "success_percent": 100.0
  },
  "results": [
    {
      "success": true,
      "meta": {},
      "exception_info": {
        "raised_exception": false,
        "exception_message": null,
        "exception_traceback": null
      },
      "result": {
        "observed_value": [
          "id",
          "elevation",
          "lat",
          "lon",
          "code",
          "city",
          "timestamp",
          "prec

In [30]:
# %% USING VALIDATION OUTPUT:
# Again, we can perform some action depending on the result of our
# validation:
continue_pipeline(validation_result["success"])

Pipeline continues.


In [31]:
# %% CUSTOM EXPECTATION
class CustomPandasDataset(PandasDataset):

    _data_asset_type = "CustomPandasDataset"

    @MetaPandasDataset.multicolumn_map_expectation
    def expect_columns_combination_to_be_unique(
        self, 
        column_list,
        *,
        index=None
    ):
        
        result = pd.Series()
        # TODO: optimize grouping based on dtype if index is given.
        grouped_df = column_list.groupby(index if index else column_list.columns[0])

        for index, group in grouped_df:
            if not (group.nunique(axis=0) == 1).all(axis=None):
                result = result.append(
                    pd.Series([False for x in range(len(group))]), ignore_index=True
                )
            else:
                result = result.append(
                    pd.Series([True for x in range(len(group))]), ignore_index=True
                )
    
        return result

# weather_data_batch = CustomPandasDataset.from_dataset(weather_data_batch)

weather_data_batch = ge.read_csv(filepath, dataset_class=CustomPandasDataset)
weather_data_batch.expect_columns_combination_to_be_unique(["id", "lat", "lon"])

validation_result = weather_data_batch.validate()
print(validation_result)

{
  "success": true,
  "meta": {
    "great_expectations.__version__": "0.11.1",
    "expectation_suite_name": "default",
    "run_id": {
      "run_name": null,
      "run_time": "2020-06-08T12:45:05.437745+00:00"
    },
    "batch_kwargs": {
      "ge_batch_id": "e0a24d14-a985-11ea-b0b8-acde48001122"
    },
    "batch_markers": {},
    "batch_parameters": {},
    "validation_time": "20200608T124505.437719Z"
  },
  "evaluation_parameters": {},
  "statistics": {
    "evaluated_expectations": 1,
    "successful_expectations": 1,
    "unsuccessful_expectations": 0,
    "success_percent": 100.0
  },
  "results": [
    {
      "success": true,
      "meta": {},
      "exception_info": {
        "raised_exception": false,
        "exception_message": null,
        "exception_traceback": null
      },
      "result": {
        "element_count": 83328,
        "missing_count": 0,
        "missing_percent": 0.0,
        "unexpected_count": 0,
        "unexpected_percent": 0.0,
        "unexpect

In [32]:
weather_data_batch.at[0, "lat"] = 91
weather_data_batch.validate()

{
  "success": false,
  "meta": {
    "great_expectations.__version__": "0.11.1",
    "expectation_suite_name": "default",
    "run_id": {
      "run_name": null,
      "run_time": "2020-06-08T12:45:05.798380+00:00"
    },
    "batch_kwargs": {
      "ge_batch_id": "e0a24d14-a985-11ea-b0b8-acde48001122"
    },
    "batch_markers": {},
    "batch_parameters": {},
    "validation_time": "20200608T124505.798339Z"
  },
  "evaluation_parameters": {},
  "statistics": {
    "evaluated_expectations": 1,
    "successful_expectations": 0,
    "unsuccessful_expectations": 1,
    "success_percent": 0.0
  },
  "results": [
    {
      "success": false,
      "meta": {},
      "exception_info": {
        "raised_exception": false,
        "exception_message": null,
        "exception_traceback": null
      },
      "result": {
        "element_count": 83328,
        "missing_count": 0,
        "missing_percent": 0.0,
        "unexpected_count": 744,
        "unexpected_percent": 0.8928571428571428,


In [33]:
# %% STEP 0: First create the data context for this weather data project.
# - in terminal enter: "great_expectations init" to initialize project context.
# - type 1 (for connecting to a filesystem)
# - type 1 (for processing with pandas).
# - type "data/brasil_weather_201301.csv" to locate the first batch of data.
# - type "weather_data" to name the expectation suite.


# %% STEP 1: Get the initialized context for this project and overwrite automated GE expectation suite.
suite_name = "weather_data"

context = ge.data_context.DataContext()
context.create_expectation_suite(suite_name, overwrite_existing=True)

# %% STEP 2: Get the first batch of data.
# We will use a single batch of data to create expectations.
# Do we know some background on design choices? Why was this necessary (mainly looking at Joost now)
batch_kwargs = {
    'dataset': weather_data_batch,
    'datasource': 'files_datasource_custom',
    'sep': ','
}
batch = context.get_batch(batch_kwargs, suite_name)
batch.head()

# %% STEP 3: Set expectations on batch and save to suite.
batch.expect_table_columns_to_match_ordered_list(required_columns)
batch.expect_column_values_to_be_between("temp", min_value=-30, max_value=60)
## Add other rules established with the audience
batch.expect_columns_combination_to_be_unique(["id", "lat", "lon"])

# Save these expectations to the central expectation suite
batch.save_expectation_suite()


# %% STEP 3: Build html data documentation from the expectation suite and inspect.
# Walkthrough the data documentation.
context.build_data_docs()


# %% STEP 4: Validate the first batch again to double check expectations.
validation_result = batch.validate()

ConfigNotFoundError: Error: No great_expectations directory was found here!
    - Please check that you are in the correct directory or have specified the correct directory.
    - If you have never run Great Expectations in this project, please run `great_expectations init` to get started.
