# Register Features Guide

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import sqlalchemy

from datetime import datetime, timedelta
from feast import Entity, FeatureStore, FeatureView, Field
from feast.infra.offline_stores.contrib.postgres_offline_store.postgres_source import (
    PostgreSQLSource,
)
from feast.infra.offline_stores.contrib.postgres_offline_store.postgres import PostgreSQLOfflineStoreConfig
from feast.infra.online_stores.redis import RedisOnlineStoreConfig
from feast.repo_config import RepoConfig, RegistryConfig
from feast.types import Float32, Int64

In [None]:
def get_sqlalchemy_engine(config):
    url = f"postgresql+psycopg2://{config['user']}:{config['password']}@{config['host']}:{config['port']}/{config['database']}"
    print("Connecting to", config["db_schema"], "schema using:", url)
    return sqlalchemy.create_engine(url, client_encoding='utf8', connect_args={'options': '-c search_path={}'.format(config["db_schema"])})

In [None]:
db_config = {
        "user": "postgres",
        "password": "postgres",
        "host": "postgresql-offline-store.default.svc.cluster.local",
        "port": 5432,
        "database": "postgres",
        "db_schema": "public"
    }

## Data Collection & Ingestion

For the purpose of this guide we will create dummy raw data that simulate the collection of data.

In general, there are systems that are responsible for collecting raw data and storing them in a database or a data warehouse.

In [None]:
def create_trip_records_df(drivers, customers, start_date, end_date, order_count) -> pd.DataFrame:
    """    
    accepted: Boolean flag indicating whether the trip order was accepted
    completed: Boolean flag indicating whether the trip was completed
    cost: cost of the trip (0.0 if not completed)
    
    Example df generated by this function:
    |   order_id |   driver_id |   customer_id |   accepted |   completed |     cost | event_timestamp           |
    |------------+-------------+---------------+------------+-------------+----------+---------------------------|
    |        100 |        1005 |          5092 |          1 |           1 |  5.69582 | 2022-06-01 09:00:00+00:00 |
    |        101 |        1011 |          5017 |          1 |           1 |  4.23811 | 2022-06-01 11:09:36+00:00 |
    |        102 |        1010 |          5095 |          1 |           1 |  3.35814 | 2022-06-01 13:19:12+00:00 |
    |        103 |        1005 |          5063 |          1 |           1 |  1.14626 | 2022-06-01 15:28:48+00:00 |
    |        ... |        .... |          .... |        ... |         ... |      ... |                           |
    |        150 |        1018 |          5005 |          1 |           1 |  7.19867 | 2022-06-01 19:48:00+00:00 |

    """
    df = pd.DataFrame()
    df["order_id"] = [order_id for order_id in range(100, 100 + order_count)]
    df["driver_id"] = np.random.choice(drivers, order_count)
    df["customer_id"] = np.random.choice(customers, order_count)
    
    df["accepted"] = np.random.choice([0,1], size=order_count, p=[0.2, 0.8]).astype(np.int32)
    df["completed"] = np.random.choice([0,1], size=order_count, p=[0.1, 0.9]).astype(np.int32)
    df["cost"] = np.random.random(size=order_count).astype(np.float32) * 10
    
    df["event_timestamp"] = [
        pd.Timestamp(dt, unit="ms", tz="UTC").round("ms")
        for dt in pd.date_range(
            start=start_date, end=end_date, periods=order_count+1, inclusive="left"
        )
    ]

    df.sort_values(
        by=[
            "event_timestamp",
            "order_id",
            "driver_id",
            "customer_id",
        ],
        inplace=True,
    )
    
    for idx in df.index:
        if df.loc[idx, "accepted"]== False:
            df.loc[idx, "completed"] = 0
            df.loc[idx, "cost"] = 0.0
        if df.loc[idx, "completed"]== False:
            df.loc[idx, "cost"] = 0.0
        
    return df

In [None]:
drivers=list(range(1001, 1021))
customers=list(range(5001, 5101))

In [None]:
end_date = datetime.utcnow().replace(microsecond=0, second=0, minute=0)
start_date = end_date - timedelta(days=30)

In [None]:
trip_records_df = create_trip_records_df(drivers, customers, start_date, end_date, 1000)

In [None]:
con = get_sqlalchemy_engine(db_config)

In [None]:
trip_records_df.to_sql(
        name="trip_records",
        con=con,
        schema=db_config["db_schema"],
        if_exists="replace",
        dtype={
            "order_id": sqlalchemy.INT,
            "driver_id": sqlalchemy.INT,
            "customer_id": sqlalchemy.INT,
            "accepted": sqlalchemy.BOOLEAN,
            "completed": sqlalchemy.BOOLEAN,
            "cost": sqlalchemy.FLOAT,
            "event_timestamp": sqlalchemy.TIMESTAMP,
        }
    )

## Data Exploration & Analysis

We fetch the collected data and start by getting some general information on the dataset.

In [None]:
con = get_sqlalchemy_engine(db_config)

In [None]:
trip_records = pd.read_sql('trip_records', con, index_col="index")

In [None]:
trip_records.head(200)

In [None]:
trip_records[trip_records["cost"] <= 0.1]

In [None]:
trip_records.info()

We continue by inspecting the cost of the trips.

In [None]:
trip_records[trip_records["cost"] > 0].describe(include=["float64"])

In [None]:
fig, ax = plt.subplots()

ax.hist(trip_records[trip_records["cost"] > 0]["cost"], bins=100, linewidth=0.5, edgecolor="white")

ax.set(xlim=(0, 10), xticks=np.arange(1, 10),
       ylim=(0, 20), yticks=np.linspace(0, 20, 5))

plt.show()

What is the average cost of a completed trip?

In [None]:
trip_records[trip_records["completed"] == True]["cost"].mean()

How many trip orders were not accepted?

In [None]:
print(f'{trip_records[trip_records["accepted"] == False].shape[0]}/1000 were not accepted')

How many trips were not completed?

In [None]:
print(f'{trip_records[(trip_records["accepted"] == True) & (trip_records["completed"] == False)].shape[0]}/{trip_records[trip_records["accepted"] == True].shape[0]} were not completed')

Finally, we explore connections between drivers and trips.

In [None]:
trip_records[trip_records["accepted"] == False][["driver_id", "accepted"]].groupby(["driver_id"]).count()

We see that some drivers reject trip orders more often than others.

In [None]:
trip_records[trip_records["completed"] == True][["driver_id", "cost"]].groupby(["driver_id"]).sum()

We also see that some drivers make more money compared to others.

Before continuing we drop useless columns and keep the ones that we are interested in (driver_id, accepted, completed, cost, event_timestamp).

In [None]:
trip_records = trip_records.drop(labels=['order_id', 'customer_id'], axis=1)

## Data Validation

In general, before creating a new feature we need to perform some kind of validation.

We will perform some basic validation, since in this guide we are the ones creating the raw data.

We start by checking for null values

In [None]:
trip_records.isnull().values.any()

Then we check:
- if rejected or uncompleted trips have non zero cost.
- if a rejected trip order was completed.

In [None]:
invalid_indexes = []

for idx, row in trip_records.iterrows():
    if row["accepted"] == False and row["cost"] != 0:
        invalid_indexes.append(idx)
    elif row["completed"] == False and row["cost"] != 0:
        invalid_indexes.append(idx)
    elif row["accepted"] == False and row["completed"] == True:
        invalid_indexes.append(idx)
    else:
        continue


In [None]:
trip_records.drop(index=invalid_indexes)

## Feature Engineering

We saw that there is a relation between drivers and trips.

Drivers that accept more trips tend to earn more money.

Thus, we will create the following features that we think will be useful for an ml model:
- daily acceptance rate = accepted trip orders / total trip orders (per day)
- daily completion rate = completed trips / accepted trip orders (per day)
- daily trips = count of completed trips in a day
- daily profit = sum of completed trips in a day

In [None]:
daily_driver_stats = pd.DataFrame()

We will start by converting timestamp to dates (no hours, minutes, seconds, etc.) and dropping useless columns

In [None]:
trip_records["event_timestamp"] = trip_records["event_timestamp"].dt.date

We continue by computing the daily trips and profit for each driver

In [None]:
daily_stats = trip_records[trip_records["completed"] == True][["driver_id", "event_timestamp", "completed", "cost"]].groupby(by=["driver_id", "event_timestamp"], as_index=False).agg({"cost" : "sum", "completed" : "count"})

In [None]:
daily_stats = daily_stats.rename(columns={"completed" : "completed_trip_orders", "cost" : "profit"})
daily_stats.head(5)

Now, let's compute the daily total and accepted trip orders

In [None]:
accepted_daily_stats = trip_records[trip_records["accepted"] == True][["driver_id", "event_timestamp", "accepted"]].groupby(by=["driver_id", "event_timestamp"], as_index=False).count()

In [None]:
accepted_daily_stats = accepted_daily_stats.rename(columns={"accepted" : "accepted_trip_orders"})
accepted_daily_stats.head(5)

In [None]:
total_daily_stats = trip_records[["driver_id", "event_timestamp", "accepted"]].groupby(by=["driver_id", "event_timestamp"], as_index=False).count()

In [None]:
total_daily_stats = total_daily_stats.rename(columns={"accepted" : "total_trip_orders"})
total_daily_stats.head(5)

Finally, we will join the features into one dataframe and compute the acceptance and completion rate

In [None]:
daily_driver_stats = total_daily_stats.join(accepted_daily_stats.set_index(["driver_id", "event_timestamp"]), how="outer", on=["driver_id", "event_timestamp"])

In [None]:
# After the outer join there are rows where we have trip orders, but the driver accepted none of them (NaN)
# Thus, we convert Nan to 0
daily_driver_stats["accepted_trip_orders"] = daily_driver_stats["accepted_trip_orders"].fillna(0)
daily_driver_stats.head(5)

In [None]:
daily_driver_stats = daily_driver_stats.join(daily_stats.set_index(["driver_id", "event_timestamp"]), how="outer", on=["driver_id", "event_timestamp"])

In [None]:
# After the outer join there are rows where we have trip orders, but the driver completed none of them (NaN) and the profit is NaN as well
# Thus, we convert Nan to 0 in both cases
daily_driver_stats["completed_trip_orders"] = daily_driver_stats["completed_trip_orders"].fillna(0)
daily_driver_stats["profit"] = daily_driver_stats["profit"].fillna(0)
daily_driver_stats.head(5)

In [None]:
daily_driver_stats["acc_rate"] = daily_driver_stats.apply(lambda x: x["accepted_trip_orders"] / x["total_trip_orders"], axis=1)
daily_driver_stats["comp_rate"] = daily_driver_stats.apply(lambda x: x["completed_trip_orders"] / x["accepted_trip_orders"] if x["accepted_trip_orders"] != 0 else 0, axis=1)

In [None]:
daily_driver_stats = daily_driver_stats.drop(labels=["total_trip_orders", "accepted_trip_orders"], axis=1)
daily_driver_stats = daily_driver_stats.rename(columns={"completed_trip_orders" : "trips"})
daily_driver_stats.head(5)

It's now time to store the new features in the database or data warehouse

In [None]:
con = get_sqlalchemy_engine(db_config)

In [None]:
daily_driver_stats.to_sql(
        name="daily_driver_stats",
        con=con,
        schema=db_config["db_schema"],
        if_exists="replace",
        dtype={
            "driver_id": sqlalchemy.INT,
            "event_timestamp": sqlalchemy.TIMESTAMP,
            "profit": sqlalchemy.FLOAT,
            "acc_rate": sqlalchemy.FLOAT,
            "comp_rate": sqlalchemy.FLOAT,  
        }
    )

## Register Feature Definitions

In this last step of the process we create the required Feast definitions

In [None]:
offline_store_config = PostgreSQLOfflineStoreConfig(
    host="postgresql-offline-store.default.svc.cluster.local",
    database="postgres",
    db_schema="public",
    user="postgres",
    password="postgres"
)

In [None]:
online_store_config = RedisOnlineStoreConfig(
    connection_string="redis-online-store.default.svc.cluster.local:6379,username=default,password=redis,db=0"
)

In [None]:
registry_config = RegistryConfig(
    registry_store_type="KubeflowRegistryStore",
    path="",
    project="kubeflow-user"
)

In [None]:
repo_config = RepoConfig(
    project="kubeflow-user",
    registry=registry_config,
    provider="local",
    offline_store=offline_store_config,
    online_store=online_store_config
)

In [None]:
fs = FeatureStore(config=repo_config, repo_path=None)

We create a data source definition that contains information on where the data lives, the name of the SQL table, etc.

In [None]:
daily_driver_stats_source = PostgreSQLSource(
    name="daily_driver_stats_source",
    query="SELECT * FROM daily_driver_stats",
    timestamp_field="event_timestamp"
)

We define an entity that keeps information about its type, the join key , etc.

In [None]:
driver = Entity(
    name="driver",
    value_type=Int64,
    description="",
    join_keys=["driver_id"],
    tags={},
    owner="user"
)

We create a feature view definition that contains the features we created previously

In [None]:
driver_daily_stats_fv = FeatureView(
    name="daily_driver_stats_fv",
    entities=["driver"],
    description="",
    tags={},
    owner="user",
    ttl=timedelta(days=7),
    source=daily_driver_stats_source,
    online=True,
    schema=[
        Field(name="acc_rate", dtype=Float32, tags={}),
        Field(name="comp_rate", dtype=Float32, tags={}),
        Field(name="profit", dtype=Float32, tags={}),
        Field(name="trips", dtype=Int64, tags={})
    ]
)

By running the appply method, we push the definitions to the registry and update the infrastructure (if needed)

In [None]:
fs.apply([daily_driver_stats_source, driver, driver_daily_stats_fv])