# Data Quality Monitoring

## Validating Historical Features with Great Expectations

### 0. Setup

In [None]:
!pip install 'feast[ge]'

### 1. Dataset preparation (Optional) 

**You can skip this step if you don't have GCP account. Please use parquet files that are coming with this tutorial instead**

In [None]:
!pip install google-cloud-bigquery

In [1]:
import pyarrow.parquet

from google.cloud.bigquery import Client

In [2]:
bq_client = Client(project='kf-feast')



In [3]:
data_query = """SELECT 
    taxi_id,
    TIMESTAMP_TRUNC(trip_start_timestamp, DAY) as day,
    SUM(trip_miles) as total_miles_travelled,
    SUM(trip_seconds) as total_trip_seconds,
    SUM(fare) as total_earned,
    COUNT(*) as trip_count
FROM `bigquery-public-data.chicago_taxi_trips.taxi_trips` 
WHERE 
    trip_miles > 0 AND trip_seconds > 0 AND
    trip_start_timestamp BETWEEN '2019-01-01' and '2020-12-31'
GROUP BY taxi_id, TIMESTAMP_TRUNC(trip_start_timestamp, DAY)"""

In [4]:
driver_stats_table = bq_client.query(data_query).to_arrow()

pyarrow.parquet.write_table(driver_stats_table, "trips_stats.parquet")

In [5]:
def entities_query(year):
    return f"""SELECT
    distinct taxi_id
FROM `bigquery-public-data.chicago_taxi_trips.taxi_trips` 
WHERE
    trip_miles > 0 AND trip_seconds > 0 AND
    trip_start_timestamp BETWEEN '{year}-01-01' and '{year}-12-31'
"""

In [15]:
entities_2019_table = bq_client.query(entities_query(2019)).to_arrow()

pyarrow.parquet.write_table(entities_2019_table, "entities.parquet")

In [7]:
#entities_2020_table = bq_client.query(entities_query(2020)).to_arrow()
#pyarrow.parquet.write_table(entities_2019_table, "entities_2020.parquet")

## 2. Declaring features

In [8]:
import pyarrow.parquet
import pandas as pd

from feast import Feature, FeatureView, Entity, FeatureStore
from feast.value_type import ValueType
from feast.data_format import ParquetFormat
from feast.on_demand_feature_view import on_demand_feature_view
from feast.infra.offline_stores.file_source import FileSource
from feast.infra.offline_stores.file import SavedDatasetFileStorage

from google.protobuf.duration_pb2 import Duration

In [9]:
batch_source = FileSource(
    event_timestamp_column="day",
    path="trips_stats.parquet",
    file_format=ParquetFormat()
)

In [10]:
taxi_entity = Entity(name='taxi', join_key='taxi_id')

In [11]:
trips_stats_fv = FeatureView(
    name='trip_stats',
    entities=['taxi'],
    features=[
        Feature("total_miles_travelled", ValueType.DOUBLE),
        Feature("total_trip_seconds", ValueType.DOUBLE),
        Feature("total_earned", ValueType.DOUBLE),
        Feature("trip_count", ValueType.INT64),
        
    ],
    ttl=Duration(seconds=86400),
    batch_source=batch_source,
)

@on_demand_feature_view(
    features=[
        Feature("avg_fare", ValueType.DOUBLE),
        Feature("avg_speed", ValueType.DOUBLE),
        Feature("avg_trip_seconds", ValueType.DOUBLE),
    ],
    inputs={
        "stats": trips_stats_fv
    }
)
def on_demand_stats(inp):
    out = pd.DataFrame()
    out["avg_fare"] = inp["total_earned"] / inp["trip_count"]
    out["avg_speed"] = 3600 * inp["total_miles_travelled"] / inp["total_trip_seconds"]
    out["avg_trip_seconds"] = inp["total_trip_seconds"] / inp["trip_count"]
    return out

In [12]:
store = FeatureStore(".")

In [13]:
store.apply([taxi_entity, trips_stats_fv, on_demand_stats])

## 3. Generating training (reference) dataset

In [16]:
taxi_ids = pyarrow.parquet.read_table("entities.parquet").to_pandas()

Generating range of timestamps with daily frequency:

In [17]:
timestamps = pd.DataFrame()
timestamps["event_timestamp"] = pd.date_range("2019-01-01", "2019-01-07", freq='D')

Cross merge (aka relation multiplication) produces entity dataframe with each taxi_id repeated for each timestamp:

In [18]:
entity_df = pd.merge(taxi_ids, timestamps, how='cross')
entity_df

Unnamed: 0,taxi_id,event_timestamp
0,91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...,2019-01-01
1,91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...,2019-01-02
2,91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...,2019-01-03
3,91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...,2019-01-04
4,91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...,2019-01-05
...,...,...
35443,7ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf...,2019-01-03
35444,7ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf...,2019-01-04
35445,7ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf...,2019-01-05
35446,7ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf...,2019-01-06


Retriving historical features for resulting entity dataframe and persisting output as a saved dataset:

In [19]:
job = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "trip_stats:total_miles_travelled",
        "trip_stats:total_trip_seconds",
        "trip_stats:total_earned",
        "trip_stats:trip_count",
        "on_demand_stats:avg_fare",
        "on_demand_stats:avg_trip_seconds",
        "on_demand_stats:avg_speed",
    ]
)
store.create_saved_dataset(
    from_=job,
    name='my_training_ds',
    storage=SavedDatasetFileStorage(path='my_training_ds.parquet')
)



<SavedDataset(name = my_training_ds, features = ['trip_stats:total_miles_travelled', 'trip_stats:total_trip_seconds', 'trip_stats:total_earned', 'trip_stats:trip_count', 'on_demand_stats:avg_fare', 'on_demand_stats:avg_trip_seconds', 'on_demand_stats:avg_speed'], join_keys = ['taxi_id'], storage = <feast.infra.offline_stores.file_source.SavedDatasetFileStorage object at 0x122fc7410>, full_feature_names = False, tags = {}, _retrieval_job = <feast.infra.offline_stores.file.FileRetrievalJob object at 0x122fc99d0>, min_event_timestamp = 2019-01-01 00:00:00, max_event_timestamp = 2019-01-07 00:00:00)>

## 4. Developing dataset profiler

In [20]:
from feast.dqm.profilers.ge_profiler import ge_profiler

from great_expectations.core.expectation_suite import ExpectationSuite
from great_expectations.dataset import PandasDataset

  def add_data_context_id_to_url(self, jinja_context, url, add_datetime=True):



In [21]:
ds = store.get_saved_dataset('my_training_ds')
ds.to_df()



Unnamed: 0,avg_speed,taxi_id,total_miles_travelled,total_trip_seconds,total_earned,avg_trip_seconds,event_timestamp,avg_fare,trip_count
0,,91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...,,,,,2019-01-01 00:00:00+00:00,,
1,12.008782,155ffe17bc32e7c3bfdfdba2750e82c6a23dab24f77d6c...,22.79,6832.0,94.75,759.111111,2019-01-01 00:00:00+00:00,10.527778,9.0
2,21.779166,4489496317d593d220fd48f54f9db2732a9a1365fb5206...,60.05,9926.0,187.00,827.166667,2019-01-01 00:00:00+00:00,15.583333,12.0
3,17.923783,81b99242c38e2b2df0c015587b3a9b03d7c303360f8c82...,14.11,2834.0,47.75,708.500000,2019-01-01 00:00:00+00:00,11.937500,4.0
4,,fd6250e08d0bb1b9ba4685874a588d1446a8830ffc45f2...,,,,,2019-01-01 00:00:00+00:00,,
...,...,...,...,...,...,...,...,...,...
35443,,0b0be23bf69b80dd81715f86c4e0887bbd5d9ebc2463b8...,,,,,2019-01-07 00:00:00+00:00,,
35444,11.330771,829dced7036593da33254d88a2d065c1698fc377e76c93...,43.40,13789.0,188.25,725.736842,2019-01-07 00:00:00+00:00,9.907895,19.0
35445,38.228611,c0591d33660deb744cef729db6457d5a0924498c9ed3bb...,45.80,4313.0,118.75,1078.250000,2019-01-07 00:00:00+00:00,29.687500,4.0
35446,26.566647,7106196249d9702fda7688e088502293990deeb7b10cb1...,50.27,6812.0,131.75,1703.000000,2019-01-07 00:00:00+00:00,32.937500,4.0


In [22]:
DELTA = 0.1  # controlling allowed window

@ge_profiler
def stats_profiler(ds: PandasDataset) -> ExpectationSuite:
    ds.expect_column_min_to_be_between("avg_speed", min_value=0)
    
    
    observed_mean = ds.trip_count.mean()
    ds.expect_column_mean_to_be_between("trip_count",
                                        min_value=observed_mean * (1 - DELTA),
                                        max_value=observed_mean * (1 + DELTA))
    
    return ds.get_expectation_suite()

Testing our profiler function:

In [23]:
ds.get_profile(profiler=stats_profiler)

02/01/2022 08:06:01 PM INFO:	2 expectation(s) included in expectation_suite. result_format settings filtered.


<GEProfile with expectations: [
  {
    "expectation_type": "expect_column_min_to_be_between",
    "meta": {},
    "kwargs": {
      "column": "avg_speed",
      "min_value": 0
    }
  },
  {
    "expectation_type": "expect_column_mean_to_be_between",
    "meta": {},
    "kwargs": {
      "column": "trip_count",
      "min_value": 7.920125245579568,
      "max_value": 9.680153077930584
    }
  }
]>

Now we can create validation reference from dataset and profiler function:

In [24]:
validation_reference = ds.as_reference(profiler=stats_profiler)

### 5. Validating historical retrieval 

Creating new timestamps for Dec 2020:

In [30]:
from feast.dqm.errors import ValidationFailed

In [26]:
timestamps = pd.DataFrame()
timestamps["event_timestamp"] = pd.date_range("2020-12-01", "2020-12-07", freq='D')

In [27]:
entity_df = pd.merge(taxi_ids, timestamps, how='cross')
entity_df

Unnamed: 0,taxi_id,event_timestamp
0,91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...,2020-12-01
1,91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...,2020-12-02
2,91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...,2020-12-03
3,91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...,2020-12-04
4,91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...,2020-12-05
...,...,...
35443,7ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf...,2020-12-03
35444,7ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf...,2020-12-04
35445,7ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf...,2020-12-05
35446,7ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf...,2020-12-06


In [28]:
job = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "trip_stats:total_miles_travelled",
        "trip_stats:total_trip_seconds",
        "trip_stats:total_earned",
        "trip_stats:trip_count",
        "on_demand_stats:avg_fare",
        "on_demand_stats:avg_trip_seconds",
        "on_demand_stats:avg_speed",
    ]
)

Execute retrieval job with validation reference:

In [33]:
try:
    df = job.to_df(validation_reference=validation_reference)
except ValidationFailed as exc:
    print(exc.validation_report)

02/01/2022 08:09:58 PM INFO:	2 expectation(s) included in expectation_suite. result_format settings filtered.
02/01/2022 08:09:58 PM INFO:Validating data_asset_name None with expectation_suite_name default


[
  {
    "exception_info": {
      "raised_exception": false,
      "exception_message": null,
      "exception_traceback": null
    },
    "result": {
      "observed_value": 6.804461643523788,
      "element_count": 35448,
      "missing_count": 31055,
      "missing_percent": 87.6071992778154
    },
    "success": false,
    "expectation_config": {
      "expectation_type": "expect_column_mean_to_be_between",
      "meta": {},
      "kwargs": {
        "column": "trip_count",
        "min_value": 7.920125245579568,
        "max_value": 9.680153077930584,
        "result_format": "COMPLETE"
      }
    },
    "meta": {}
  }
]


Validation failed because avg trip count decreased significantly (more than 10%, see DELTA above) in this new dataset.