In [1]:
import os
util_dir = os.path.join(os.path.dirname(os.getcwd()), 'src/utils/')
os.chdir(util_dir)
from geo_processing_utils import get_num_neighbours, get_dist_from_bc

import feast
import pandas as pd

In [2]:
fs = feast.FeatureStore(repo_path="/home/gianmaria/repos/airbnb-bc/src/feature_store/feature_repo")

In [3]:

df = pd.read_parquet("/home/gianmaria/repos/airbnb-bc/src/feature_store/feature_repo/data/data_df3.parquet")

In [11]:
from feast.on_demand_feature_view import on_demand_feature_view
from feast.types import Float64, Int64, String
from feast import Field

@on_demand_feature_view(
    sources=[
      fs.get_feature_view(name='df2_feature_view')
    ],
    schema=[
        Field(name="rate_blocked_days", dtype=Float64),
        Field(name="rate_available_days", dtype=Float64),
   
    ]
)
def on_demand_rates(input_df:pd.DataFrame) -> pd.DataFrame:
    out = pd.DataFrame()
    out["rate_blocked_days"] = input_df["blocked_days"] / input_df["available_days"]
    out["rate_available_days"] = input_df["available_days"] / input_df["blocked_days"]
    
    return out

@on_demand_feature_view(
    sources=[
      fs.get_feature_view(name='df3_feature_view')
    ],
    schema=[
        Field(name="num_neighbours", dtype=Int64),
        Field(name="dist_from_bc", dtype=Float64),
   
    ]
)
def on_demand_geo_feat(input_df:pd.DataFrame) -> pd.DataFrame:
    out = pd.DataFrame()
    out["num_neighbours"] = get_num_neighbours(input_df)["num_neighbours"]
    out["dist_from_bc"] = get_dist_from_bc(input_df)["dist_from_bc"]
    
    return out

In [12]:
fs.apply([on_demand_rates, on_demand_geo_feat])



In [15]:
from feast.infra.offline_stores.file_source import SavedDatasetFileStorage
df = pd.read_parquet("/home/gianmaria/repos/airbnb-bc/src/feature_store/feature_repo/data/train_df.parquet")

training_df = fs.get_historical_features(
    entity_df= df,
    features=['df1_feature_view:bathrooms',
              'df2_feature_view:available_days',
              'df2_feature_view:blocked_days',
              'on_demand_rates:rate_blocked_days',
              'on_demand_rates:rate_available_days',
              'df3_feature_view:latitude',
              'df3_feature_view:longitude',
              'df3_feature_view:GEO_ID',
              'on_demand_geo_feat:num_neighbours',
              'on_demand_geo_feat:dist_from_bc',

              ], 
)


In [16]:
trdf=training_df.to_df()

In [18]:
trdf

Unnamed: 0,airbnb_property_id,target,event_timestamp,bathrooms,available_days,blocked_days,latitude,longitude,GEO_ID,rate_blocked_days,rate_available_days,num_neighbours,dist_from_bc
0,54409228,True,2022-11-01 00:00:00+00:00,1,14,17,50.85864,-0.15201,50.85864|-0.15201,1.214286,0.823529,1,2.549115
1,23936388,True,2022-11-01 00:00:00+00:00,1,30,1,50.84128,-0.17540,50.84128|-0.1754,0.033333,30.000000,1,2.303420
2,23938146,True,2022-11-01 00:00:00+00:00,1,27,4,50.84528,-0.13343,50.84528|-0.13343,0.148148,6.750000,8,1.621307
3,23952468,True,2022-11-01 00:00:00+00:00,1,16,15,50.82820,-0.14456,50.8282|-0.14456,0.937500,1.066667,23,0.148663
4,35439212,True,2022-11-01 00:00:00+00:00,2,27,4,50.83000,-0.13800,50.83|-0.138,0.148148,6.750000,18,0.561269
...,...,...,...,...,...,...,...,...,...,...,...,...,...
19851,660585964026266078,True,2023-08-01 00:00:00+00:00,1,28,3,50.82405,-0.15042,50.82405|-0.15042,0.107143,9.333333,57,0.563583
19852,660145701290574893,True,2023-08-01 00:00:00+00:00,2,23,8,50.81407,-0.10806,50.81407|-0.10806,0.347826,2.875000,2,2.976027
19853,664191402423774690,False,2023-08-01 00:00:00+00:00,1,3,28,50.82424,-0.13663,50.82424|-0.13663,9.333333,0.107143,12,0.796723
19854,661460035779281614,False,2023-08-01 00:00:00+00:00,1,29,2,50.82526,-0.13392,50.82526|-0.13392,0.068966,14.500000,9,0.921308


In [17]:

dataset = fs.create_saved_dataset(
    from_=training_df,
    name='my_training_ds',
    allow_overwrite=True,
    storage=SavedDatasetFileStorage(path='my_training_ds.parquet'),
    tags={'author': 'fsxz'}
)




In [25]:
import numpy as np
import feast

from feast.dqm.profilers.ge_profiler import ge_profiler

from great_expectations.core.expectation_suite import ExpectationSuite
from great_expectations.dataset import PandasDataset

In [70]:
DELTA = 0.1  # controlling allowed window in fraction of the value on scale [0, 1]

@ge_profiler
def stats_profiler(ds: PandasDataset) -> ExpectationSuite:
    # simple checks on data consistency
    ds.expect_column_values_to_be_between(
        "available_days",
        min_value=1,
        max_value=31,
        mostly=0.99  # allow some outliers
    )

    ds.expect_column_values_to_be_between(
        "bathrooms",
        min_value=1,
        max_value=1,
        mostly=0.99  # allow some outliers
    )

    ds.expect_column_values_to_be_between(
        "rate_blocked_days",
        min_value=0,
        max_value=100.,
        mostly=0.99  # allow some outliers
    )

    # expectation of means based on observed values
    # observed_mean = ds.trip_count.mean()
    # ds.expect_column_mean_to_be_between("trip_count",
    #                                     min_value=observed_mean * (1 - DELTA),
    #                                     max_value=observed_mean * (1 + DELTA))

    # observed_mean = ds.earned_per_hour.mean()
    # ds.expect_column_mean_to_be_between("earned_per_hour",
    #                                     min_value=observed_mean * (1 - DELTA),
    #                                     max_value=observed_mean * (1 + DELTA))


    # expectation of quantiles
    # qs = [0.5, 0.75, 0.9, 0.95]
    # observed_quantiles = ds.avg_fare.quantile(qs)

    # ds.expect_column_quantile_values_to_be_between(
    #     "avg_fare",
    #     quantile_ranges={
    #         "quantiles": qs,
    #         "value_ranges": [[None, max_value] for max_value in observed_quantiles]
    #     })

    return ds.get_expectation_suite(discard_failed_expectations=False)

In [71]:
ds = fs.get_saved_dataset('my_training_ds')
# ds.get_profile(profiler=stats_profiler)

<GEProfile with expectations: [
  {
    "expectation_type": "expect_column_values_to_be_between",
    "kwargs": {
      "column": "available_days",
      "min_value": 1,
      "max_value": 31,
      "mostly": 0.99
    },
    "meta": {}
  },
  {
    "expectation_type": "expect_column_values_to_be_between",
    "kwargs": {
      "column": "bathrooms",
      "min_value": 1,
      "max_value": 1,
      "mostly": 0.99
    },
    "meta": {}
  },
  {
    "expectation_type": "expect_column_values_to_be_between",
    "kwargs": {
      "column": "rate_blocked_days",
      "min_value": 0,
      "max_value": 100.0,
      "mostly": 0.99
    },
    "meta": {}
  }
]>

In [72]:
vr = ds.as_reference(name="validation_reference_dataset", profiler=stats_profiler)

In [55]:
from feast.saved_dataset import ValidationReference

In [63]:
vr = ValidationReference('check',dataset_name = 'my_training_ds', profiler=stats_profiler)

In [69]:
dataset

<SavedDataset(name = my_training_ds, features = ['df1_feature_view:bathrooms', 'df2_feature_view:available_days', 'df2_feature_view:blocked_days', 'on_demand_stuff:rate_blocked_days', 'on_demand_stuff:rate_available_days'], join_keys = ['target', 'airbnb_property_id'], storage = <feast.infra.offline_stores.file_source.SavedDatasetFileStorage object at 0x77be9342c880>, full_feature_names = False, tags = {'author': 'fsxz'}, feature_service_name = None, _retrieval_job = <feast.infra.offline_stores.file.FileRetrievalJob object at 0x77be918e71f0>, min_event_timestamp = 2022-11-01 00:00:00+00:00, max_event_timestamp = 2023-10-01 00:00:00+00:00, created_timestamp = 2024-05-21 20:35:18.939313, last_updated_timestamp = 2024-05-21 20:35:18.939313)>

In [67]:
vr = ValidationReference.from_saved_dataset('check', dataset=fs.get_saved_dataset('my_training_ds'), profiler=stats_profiler)

In [37]:
job = fs.get_historical_features(
    entity_df= df,
    features=['df1_feature_view:bathrooms',
              'df2_feature_view:available_days',
              'df2_feature_view:blocked_days',
              'on_demand_stuff:rate_blocked_days',
              'on_demand_stuff:rate_available_days'
              ], 
)

In [76]:
from feast.dqm.errors import ValidationFailed
try:
    pippo = job.to_df(validation_reference=vr)
except ValidationFailed as exc:
    print(exc.validation_report)

[
  {
    "success": false,
    "expectation_config": {
      "expectation_type": "expect_column_values_to_be_between",
      "kwargs": {
        "column": "bathrooms",
        "min_value": 1,
        "max_value": 1,
        "mostly": 0.99,
        "result_format": "COMPLETE"
      },
      "meta": {}
    },
    "result": {
      "element_count": 24849,
      "missing_count": 0,
      "missing_percent": 0.0,
      "unexpected_count": 8291,
      "unexpected_percent": 33.365527787838545,
      "unexpected_percent_total": 33.365527787838545,
      "unexpected_percent_nonmissing": 33.365527787838545,
      "partial_unexpected_list": [
        2,
        0,
        2,
        3,
        2,
        4,
        2,
        2,
        2,
        2,
        3,
        2,
        2,
        3,
        3,
        2,
        2,
        2,
        2,
        2
      ],
      "partial_unexpected_index_list": [
        2,
        3,
        8,
        12,
        25,
        33,
        36,
        37

In [41]:
pippo.shape, job.to_df().shape

((24849, 8), (24849, 8))