In [2]:
!feast init dqm_repo


Creating a new Feast repository in [1m[32m/Users/franciscojavierarceo/GitHub/feast/examples/data-quality-monitoring/dqm_repo[0m.



In [3]:
cd dqm_repo/feature_repo

/Users/franciscojavierarceo/GitHub/feast/examples/data-quality-monitoring/dqm_repo/feature_repo


In [4]:
dqm_demo_file = """
from datetime import timedelta
from feast import Entity
taxi_entity = Entity(name='taxi', join_keys=['taxi_id'])
from feast import FileSource, Field
from feast.data_format import ParquetFormat
from feast.types import Float32, Float64, Int64

batch_source = FileSource(
    timestamp_field="day",
    # using parquet file that we created on previous step
    path="../../trips_stats.parquet",  
    file_format=ParquetFormat()
)

from feast import BatchFeatureView

trips_stats_fv = BatchFeatureView(
    name='trip_stats',
    entities=[taxi_entity],
    ttl=timedelta(seconds=86400),
    schema=[
        Field(name="total_miles_travelled", dtype=Float64),
        Field(name="total_trip_seconds", dtype=Float64),
        Field(name="total_earned", dtype=Float64),
        Field(name="trip_count", dtype=Int64),

    ],
    source=batch_source,
    online=True,
    tags={"production": "True"},
    owner='test1@gmail.com',
)

import pyarrow.parquet
import pandas as pd

from feast import FeatureView, Entity, FeatureStore, Field, BatchFeatureView
from feast.types import Float64, Int64
from feast.value_type import ValueType
from feast.data_format import ParquetFormat
from feast.on_demand_feature_view import on_demand_feature_view
from feast.infra.offline_stores.file_source import FileSource
from feast.infra.offline_stores.file import SavedDatasetFileStorage
from datetime import timedelta

@on_demand_feature_view(
    schema=[
        Field(name="avg_fare", dtype=Float64),
        Field(name="avg_speed", dtype=Float64),
        Field(name="avg_trip_seconds", dtype=Float64),
        Field(name="earned_per_hour", dtype=Float64),
    ],
    sources=[
      trips_stats_fv,
    ]
)
def on_demand_stats(input):
    out = pd.DataFrame()
    out["avg_fare"] = input["total_earned"] / input["trip_count"]
    out["avg_speed"] = 3600 * input["total_miles_travelled"] / input["total_trip_seconds"]
    out["avg_trip_seconds"] = input["total_trip_seconds"] / input["trip_count"]
    out["earned_per_hour"] = 3600 * input["total_earned"] / input["total_trip_seconds"]
    return out

"""

In [5]:
f = open('dqm_example.py', 'w')
f.write(dqm_demo_file)
f.close()

In [6]:
! feast apply

  schema = ParquetDataset(path).schema.to_arrow_schema()
Created entity [1m[32mtaxi[0m
Created entity [1m[32mdriver[0m
Created feature view [1m[32mtrip_stats[0m
Created feature view [1m[32mdriver_hourly_stats_fresh[0m
Created feature view [1m[32mdriver_hourly_stats[0m
Created on demand feature view [1m[32mtransformed_conv_rate_fresh[0m
Created on demand feature view [1m[32mon_demand_stats[0m
Created on demand feature view [1m[32mtransformed_conv_rate[0m
Created feature service [1m[32mdriver_activity_v3[0m
Created feature service [1m[32mdriver_activity_v2[0m
Created feature service [1m[32mdriver_activity_v1[0m

Created sqlite table [1m[32mdqm_repo_trip_stats[0m
Created sqlite table [1m[32mdqm_repo_driver_hourly_stats_fresh[0m
Created sqlite table [1m[32mdqm_repo_driver_hourly_stats[0m



# Now the DQM piece

In [7]:
import pyarrow.parquet
taxi_ids = pyarrow.parquet.read_table("../../entities.parquet").to_pandas()

In [8]:
import json
import pandas as pd
timestamps = pd.DataFrame()
timestamps["event_timestamp"] = pd.date_range("2019-06-01", "2019-07-01", freq='D')
entity_df = pd.merge(taxi_ids, timestamps, how='cross')
entity_df.head()

Unnamed: 0,taxi_id,event_timestamp
0,91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...,2019-06-01
1,91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...,2019-06-02
2,91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...,2019-06-03
3,91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...,2019-06-04
4,91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...,2019-06-05


In [9]:
from feast import FeatureStore
store = FeatureStore("/Users/franciscojavierarceo/GitHub/feast/examples/data-quality-monitoring/dqm_repo/feature_repo/")

In [10]:
job = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "trip_stats:total_miles_travelled",
        "trip_stats:total_trip_seconds",
        "trip_stats:total_earned",
        "trip_stats:trip_count",
        "on_demand_stats:avg_fare",
        "on_demand_stats:avg_trip_seconds",
        "on_demand_stats:avg_speed",
        "on_demand_stats:earned_per_hour",
    ]
)

In [21]:
from feast.infra.offline_stores.file import SavedDatasetFileStorage

try:
    saved_ds = store.create_saved_dataset(
        from_=job,
        name='my_training_ds',
        storage=SavedDatasetFileStorage(path='../../dqm_repo/feature_repo/my_training_ds.parquet')
    )
except Exception as e:
    print(e)



In [22]:
store.apply(saved_ds)

# Now setting up the profiler

In [23]:
store.registry.list_saved_datasets(project='dqm_repo')[0]

<SavedDataset(name = my_training_ds, features = ['trip_stats:total_miles_travelled', 'trip_stats:total_trip_seconds', 'trip_stats:total_earned', 'trip_stats:trip_count', 'on_demand_stats:avg_fare', 'on_demand_stats:avg_trip_seconds', 'on_demand_stats:avg_speed', 'on_demand_stats:earned_per_hour'], join_keys = ['taxi_id'], storage = <feast.infra.offline_stores.file_source.SavedDatasetFileStorage object at 0x1357906d0>, full_feature_names = False, tags = {}, feature_service_name = None, _retrieval_job = None, created_timestamp = 2022-11-03 01:15:54.300676, min_event_timestamp = 2019-06-01 00:00:00, max_event_timestamp = 2019-07-01 00:00:00)>

In [24]:
import numpy as np

from feast.dqm.profilers.ge_profiler import ge_profiler

from great_expectations.core.expectation_suite import ExpectationSuite
from great_expectations.dataset import PandasDataset

  from urllib3.contrib.pyopenssl import orig_util_SSLContext as SSLContext


In [25]:
ds = store.get_saved_dataset('my_training_ds')
ds.to_df().head()



Unnamed: 0,avg_fare,event_timestamp,taxi_id,avg_speed,avg_trip_seconds,earned_per_hour,total_miles_travelled,total_trip_seconds,total_earned,trip_count
0,25.4375,2019-06-01 00:00:00+00:00,d13c5aaa066f94b4927779ed24cd313b0c686f03407095...,15.559701,2010.0,45.559701,69.5,16080,203.5,8
1,14.85,2019-06-01 00:00:00+00:00,33164e16dd29b1c58cd15cce31df4bfcb75d9903cb66de...,7.707317,1476.0,36.219512,15.8,7380,74.25,5
2,19.125,2019-06-01 00:00:00+00:00,226fe0b00be42932bdff81bc0b318b883bfbf15dd48093...,18.188976,1270.0,54.212598,38.5,7620,114.75,6
3,17.6875,2019-06-01 00:00:00+00:00,5a5bed1b5ced617d0594007d591f10bbbca354d50b19ca...,12.860777,1415.0,45.0,20.22,5660,70.75,4
4,20.85,2019-06-01 00:00:00+00:00,b7f7dbb452c0fb980a0f2050a146147c1006fe5f34e3b0...,17.793637,1395.6,53.783319,34.49,6978,104.25,5


In [26]:
import numpy as np

from great_expectations.core.expectation_suite import ExpectationSuite
from great_expectations.dataset import PandasDataset
from feast.dqm.profilers.ge_profiler import ge_profiler

In [27]:
DELTA = 0.1  # controlling allowed window in fraction of the value on scale [0, 1]

@ge_profiler
def stats_profiler(ds: PandasDataset) -> ExpectationSuite:    
    # simple checks on data consistency
    ds.expect_column_values_to_be_between(
        "avg_speed",
        min_value=0,
        max_value=60,
        mostly=0.99  # allow some outliers
    )

    ds.expect_column_values_to_be_between(
        "total_miles_travelled",
        min_value=0,
        max_value=500,
        mostly=0.99  # allow some outliers
    )

    # expectation of means based on observed values
    observed_mean = ds.trip_count.mean()
    ds.expect_column_mean_to_be_between("trip_count",
                                        min_value=observed_mean * (1 - DELTA),
                                        max_value=observed_mean * (1 + DELTA))

    observed_mean = ds.earned_per_hour.mean()
    ds.expect_column_mean_to_be_between("earned_per_hour",
                                        min_value=observed_mean * (1 - DELTA),
                                        max_value=observed_mean * (1 + DELTA))

    # expectation of quantiles
    qs = [0.5, 0.75, 0.9, 0.95]
    observed_quantiles = ds.avg_fare.quantile(qs)

    ds.expect_column_quantile_values_to_be_between(
        "avg_fare",
        quantile_ranges={
            "quantiles": qs,
            "value_ranges": [[None, max_value] for max_value in observed_quantiles]
        })

    # Note that the default is TRUE!
    # return ds.get_expectation_suite(discard_failed_expectations=False)
    return ds.get_expectation_suite()

@ge_profiler
def stats_profiler_fail(ds: PandasDataset) -> ExpectationSuite:
    # This should fail
    ds.expect_column_to_exist("this is a test")
    return ds.get_expectation_suite(discard_failed_expectations=False)

@ge_profiler
def stats_profiler_pass(ds: PandasDataset) -> ExpectationSuite:
    # This should fail
    ds.expect_column_to_exist("trip_count")
    return ds.get_expectation_suite(discard_failed_expectations=False)

In [28]:
prof = ds.get_profile(profiler=stats_profiler)
prof_fail = ds.get_profile(profiler=stats_profiler_fail)
prof_pass = ds.get_profile(profiler=stats_profiler_pass)

# Batch Validation

In [29]:
assert len(prof.expectation_suite['expectations']) == 5
assert len(prof_fail.expectation_suite['expectations']) == 1
assert len(prof_pass.expectation_suite['expectations']) == 1

In [30]:
validation_reference = ds.as_reference(name='my_validator', profiler=stats_profiler)
validation_reference_fail = ds.as_reference(name='my_validator', profiler=stats_profiler_fail)
validation_reference_pass = ds.as_reference(name='my_validator', profiler=stats_profiler_pass)

In [31]:
from feast.dqm.errors import ValidationFailed

In [32]:
timestamps = pd.DataFrame()
timestamps["event_timestamp"] = pd.date_range("2020-12-01", "2020-12-07", freq='D')
entity_df = pd.merge(taxi_ids, timestamps, how='cross')
entity_df.head()

Unnamed: 0,taxi_id,event_timestamp
0,91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...,2020-12-01
1,91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...,2020-12-02
2,91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...,2020-12-03
3,91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...,2020-12-04
4,91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...,2020-12-05


In [33]:
job = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "trip_stats:total_miles_travelled",
        "trip_stats:total_trip_seconds",
        "trip_stats:total_earned",
        "trip_stats:trip_count",
        "on_demand_stats:avg_fare",
        "on_demand_stats:avg_trip_seconds",
        "on_demand_stats:avg_speed",
        "on_demand_stats:earned_per_hour",        
    ],
)

In [34]:
try:
    df = job.to_df(validation_reference=validation_reference_pass)
    print('validation passed')
except ValidationFailed as exc:
    print('validation failed\n', exc.validation_report)



validation passed


In [35]:
try:
    df = job.to_df(validation_reference=validation_reference_fail)
except ValidationFailed as exc:
    print('validation failed\n', exc.validation_report)



validation failed
 [
  {
    "expectation_config": {
      "kwargs": {
        "column": "this is a test",
        "result_format": "COMPLETE"
      },
      "meta": {},
      "expectation_type": "expect_column_to_exist"
    },
    "result": {},
    "exception_info": {
      "raised_exception": false,
      "exception_message": null,
      "exception_traceback": null
    },
    "meta": {},
    "success": false
  }
]


In [36]:
try:
    df = job.to_df(validation_reference=validation_reference)
except ValidationFailed as exc:
    print('validation failed\n', exc.validation_report)



validation failed
 [
  {
    "expectation_config": {
      "kwargs": {
        "column": "trip_count",
        "min_value": 10.387244591346153,
        "max_value": 12.695521167200855,
        "result_format": "COMPLETE"
      },
      "meta": {},
      "expectation_type": "expect_column_mean_to_be_between"
    },
    "result": {
      "observed_value": 6.692920555429092,
      "element_count": 4393,
      "missing_count": null,
      "missing_percent": null
    },
    "exception_info": {
      "raised_exception": false,
      "exception_message": null,
      "exception_traceback": null
    },
    "meta": {},
    "success": false
  },
  {
    "expectation_config": {
      "kwargs": {
        "column": "earned_per_hour",
        "min_value": 52.32062497564023,
        "max_value": 63.9474305257825,
        "result_format": "COMPLETE"
      },
      "meta": {},
      "expectation_type": "expect_column_mean_to_be_between"
    },
    "result": {
      "observed_value": 68.99268345164135,
 

## Online Validation

# What do we want to do?

We want to query the online feature view and use the real time data coming in from the input data

The batch feature view is `trip_stats` and the online fields are going to be:

- total_miles_travelled
- total_trip_seconds
- total_earned
- trip_count

and are calculated via:
```
def on_demand_stats(input):
    out = pd.DataFrame()
    out["avg_fare"] = input["total_earned"] / input["trip_count"]
    out["avg_speed"] = 3600 * input["total_miles_travelled"] / input["total_trip_seconds"]
    out["avg_trip_seconds"] = input["total_trip_seconds"] / input["trip_count"]
    out["earned_per_hour"] = 3600 * input["total_earned"] / input["total_trip_seconds"]
    return out
```

In [37]:
xvars = [
    'taxi_id', 
    'trip_count', 
    'total_earned', 
    'total_trip_seconds', 
    'total_miles_travelled', 
]

request_payload = []
for i, row in ds.to_df().head()[xvars].iterrows():
    request_payload.append( 
        row.to_dict()
    )

In [38]:
print("\n--- Online features ---")

features = store.get_online_features(
    features=[
        "trip_stats:total_miles_travelled",
        "trip_stats:total_trip_seconds",
        "trip_stats:total_earned",
        "trip_stats:trip_count",
        "on_demand_stats:avg_fare",
        "on_demand_stats:avg_trip_seconds",
        "on_demand_stats:avg_speed",
        "on_demand_stats:earned_per_hour",        
    ],
    entity_rows=request_payload,
).to_dict()

print(json.dumps(features, indent=1))


--- Online features ---


EntityNotFoundException: Entity trip_count does not exist in project dqm_repo

In [None]:
event_df = pd.DataFrame.from_dict(
    {
        "driver_id": [1001],
        "event_timestamp": [
            datetime(2021, 5, 13, 10, 59, 42),
        ],
        "created": [
            datetime(2021, 5, 13, 10, 59, 42),
        ],
        "conv_rate": [1.0],
        "acc_rate": [1.0],
        "avg_daily_trips": [1000],
        "string_feature": "test2",
    }
)
print(event_df)
store.push("driver_stats_push_source", event_df)

In [43]:
features

{'taxi_id': ['1001'],
 'trip_count': [None],
 'total_trip_seconds': [None],
 'total_miles_travelled': [None],
 'total_earned': [None],
 'avg_fare': [None],
 'avg_speed': [None],
 'avg_trip_seconds': [None],
 'earned_per_hour': [None]}

# END