![](feature.png)

# Feature Store with Kubeflow Orchestration
## Contents
- <a href='#1'>1. Load Python libraries and importing the data</a>  


- <a href='#2'>2. Feature Store Client initialization</a> 
    - <a href='#2.1'>2.1. Declare Enviorment Variables for feast</a> 
    - <a href='#2.2'>2.2. Create GCS Bucket</a> 
    - <a href='#2.3'>2.3. Feature Store Client: Used for creating, managing, and retrieving features</a> 
    - <a href='#2.4'>2.4. Load Data</a> 
    
    
- <a href='#3'>3. Feature Registry</a>
    - <a href='#3.1'>3.1. Declare Features and Entities</a> 
    - <a href='#3.2'>3.2. Declare  Feature Table Schema</a> 
    - <a href='#3.2'>3.3. Registering entities and feature tables in Feature Store</a> 
    - <a href='#3.2'>3.4. Populating batch source Ingestion</a> 
    - <a href='#3.3'>3.5. Check the Feature Table stats</a>
    
    
    
- <a href='#4'>4. Batch Source Online Ingestion</a>  




- <a href='#5'>5. Online Storage with Batch Ingestion</a>
    - <a href='#5.1'>5.1. Util Kafka Function</a> 
    - <a href='#5.2'>5.2. List Kafka Intializer</a> 
    - <a href='#5.3'>5.3. Trip Data Kakfa Ingestion</a> 
    - <a href='#5.4'>5.4. Fare Data kafka Ingestion</a> 
    

- <a href='#6'>6. Kubeflow Pipeline Artifacts for Model Training </a>



## 1 Load Python libraries and importing the data

In [1]:
import os
import time
import json
import random
import warnings
import numpy as np
import pandas as pd
from datetime import datetime
from feast.config import Config
from feast.data_source import FileSource, KafkaSource
from feast.data_format import ParquetFormat, AvroFormat
from feast import Client, Feature, Entity, ValueType, FeatureTable
from feast.pyspark.abc import RetrievalJobParameters, SparkJobStatus, SparkJob
import feast.staging.entities as entities
warnings.filterwarnings('ignore')

!pip install feast==0.9
!pip install protobuf gcsfs  -U -q
!pip install --upgrade pip
!pip install google-cloud-dataproc==2.3.1
!pip install confluent_kafka
!pip install kafka-python
!pip install avro

## 2 Feature Store Client initialization

Feast contains the following core concepts:

* **Projects:** Serve as a top level namespace for all Feast resources. Each project is a completely independent environment in Feast. Users can only work in a single project at a time.
* **Entities:** Entities are the objects in an organization on which features occur. They map to your business domain \(users, products, transactions, locations\).
* **Feature Tables:** Defines a group of features that occur on a specific entity.
* **Features:** Individual feature within a feature table.

### 2.1 Declare Enviorment Variables for feast

>  Run the following Command by connecting to Kubernetes Cluster

%%bash
```
kubectl get svc 

```
- Copy the Redis & Kafka IP and paste in below variables 
- Copy the Project ID and paste below
- Copy the Dataproc Cluster Name and GCS Staging bucket
- Copy the GCS feast staging bucket

In [2]:

class feature_store_client:
    
    def __init__(self,env,bucket):
        
        self.env=env
        self.staging_bucket=bucket
        
    def feature_store_settings(self):
        
        if self.env.lower()=="dataproc":
            # Using environmental variables
            environment = {'FEAST_CORE_URL': 'feast-release-feast-core.default:6565',
                         'FEAST_DATAPROC_CLUSTER_NAME': 'dataprocfeast',
                         'FEAST_DATAPROC_PROJECT': '<BUCKET>',
                         'FEAST_DATAPROC_REGION': 'us-east1',
                         'FEAST_STAGING_LOCATION': self.staging_bucket,
                         'FEAST_HISTORICAL_FEATURE_OUTPUT_FORMAT': 'parquet',
                         'FEAST_HISTORICAL_FEATURE_OUTPUT_LOCATION': f"{self.staging_bucket}historical" ,
                         'FEAST_HISTORICAL_SERVING_URL': 'feast-release-feast-online-serving.default:6566',
                         'FEAST_REDIS_HOST': '<REDIS_IP>',
                         'FEAST_REDIS_PORT': '6379',
                         'FEAST_SERVING_URL': 'feast-release-feast-online-serving.default:6566',
                         'FEAST_SPARK_HOME': '/usr/local/spark',
                         'FEAST_SPARK_LAUNCHER': 'dataproc',
                         'FEAST_SPARK_STAGING_LOCATION': 'gs://dataproc-staging-us-east1-996861042416-4w01soni/artifacts/',
                         'FEAST_SPARK_STANDALONE_MASTER': 'local[*]',
                         'STAGING_BUCKET': 'self.staging_bucket',
                         'DEMO_KAFKA_BROKERS': '<KAFKA_IP>'
                           
                          }              
     
            for key,value in environment.items():
                os.environ[key] = value 
            
            
       

### 2.2 Create GCS Bucket

In [3]:
def create_staging_bucket():
    staging_bucket = f'gs://feast-staging-bucket-{random.randint(1000000, 10000000)}/'
    !gsutil mb {staging_bucket}
    print(f'Staging bucket is {staging_bucket}')
    return staging_bucket


In [4]:
#Create Bucket
staging_bucket=create_staging_bucket()

In [5]:
set_env=feature_store_client('Dataproc',staging_bucket)
set_env.feature_store_settings()

### 2.3 Feature Store Client: Used for creating, managing, and retrieving features

In [6]:
import os
from pprint import pprint
pprint({key: value for key, value in os.environ.items() if key.startswith("FEAST_")})

{'FEAST_CORE_URL': 'feast-release-feast-core.default:6565',
 'FEAST_DATAPROC_CLUSTER_NAME': 'dataprocfeast',
 'FEAST_DATAPROC_PROJECT': '<BUCKET>',
 'FEAST_DATAPROC_REGION': 'us-east1',
 'FEAST_HISTORICAL_FEATURE_OUTPUT_FORMAT': 'parquet',
 'FEAST_HISTORICAL_FEATURE_OUTPUT_LOCATION': 'gs://feast-staging-bucket-9919526/historical',
 'FEAST_HISTORICAL_SERVING_URL': 'feast-release-feast-online-serving.default:6566',
 'FEAST_REDIS_HOST': '<REDIS_IP>',
 'FEAST_REDIS_PORT': '6379',
 'FEAST_SERVING_URL': 'feast-release-feast-online-serving.default:6566',
 'FEAST_SPARK_HOME': '/usr/local/spark',
 'FEAST_SPARK_LAUNCHER': 'dataproc',
 'FEAST_SPARK_STAGING_LOCATION': 'gs://dataproc-staging-us-east1-996861042416-4w01soni/artifacts/',
 'FEAST_SPARK_STANDALONE_MASTER': 'local[*]',
 'FEAST_STAGING_LOCATION': 'gs://feast-staging-bucket-9919526/'}


In [14]:

client = Client()
               

### 1.5 Import Data

In [15]:
fare_details=pd.read_csv("faredetails.csv")
fare_details.head()

Unnamed: 0,driver_id,pickup_datetime,passenger_count,fare_amount,target,created
0,610685,2009-06-15 17:26:21+00:00,1,-1.354113,0,2021-06-12 16:09:45.131085
1,825735,2010-01-05 16:52:16+00:00,1,1.088648,1,2021-06-12 16:09:45.131085
2,428317,2011-08-18 00:35:00+00:00,2,-0.813646,0,2021-06-12 16:09:45.131085
3,356886,2012-04-21 04:30:42+00:00,1,-0.191734,0,2021-06-12 16:09:45.131085
4,603801,2010-03-09 07:51:00+00:00,1,-0.975267,0,2021-06-12 16:09:45.131085


In [16]:
trip_details=pd.read_csv("tripdetails.csv")
trip_details.head()

Unnamed: 0,driver_id,key,pickup_datetime,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,longitude_distance,latitude_distance,distance_travelled,created
0,610685,2009-06-15 17:26:21.000000100,2009-06-15 17:26:21+00:00,-73.844311,40.721319,-73.84161,40.712278,0.002701,0.009041,0.009436,2021-06-12 16:09:45.131085
1,825735,2010-01-05 16:52:16.000000200,2010-01-05 16:52:16+00:00,-74.016048,40.711303,-73.979268,40.782004,0.03678,0.070701,0.079696,2021-06-12 16:09:45.131085
2,428317,2011-08-18 00:35:00.000000490,2011-08-18 00:35:00+00:00,-73.982738,40.76127,-73.991242,40.750562,0.008504,0.010708,0.013674,2021-06-12 16:09:45.131085
3,356886,2012-04-21 04:30:42.000000100,2012-04-21 04:30:42+00:00,-73.98713,40.733143,-73.991567,40.758092,0.004437,0.024949,0.02534,2021-06-12 16:09:45.131085
4,603801,2010-03-09 07:51:00.000000135,2010-03-09 07:51:00+00:00,-73.968095,40.768008,-73.956655,40.783762,0.01144,0.015754,0.01947,2021-06-12 16:09:45.131085


In [17]:
def change_datetime(df,col):
    df[col]=pd.to_datetime(df[col])
    return df

In [18]:
fare_details=change_datetime(fare_details,'pickup_datetime')
fare_details=change_datetime(fare_details,'created')
trip_details=change_datetime(trip_details,'created')
trip_details=change_datetime(trip_details,'pickup_datetime')

In [19]:
trip_details.drop(columns='key',inplace=True)

In [20]:
trip_details.dtypes,fare_details.dtypes

(driver_id                           int64
 pickup_datetime       datetime64[ns, UTC]
 pickup_longitude                  float64
 pickup_latitude                   float64
 dropoff_longitude                 float64
 dropoff_latitude                  float64
 longitude_distance                float64
 latitude_distance                 float64
 distance_travelled                float64
 created                    datetime64[ns]
 dtype: object, driver_id                        int64
 pickup_datetime    datetime64[ns, UTC]
 passenger_count                  int64
 fare_amount                    float64
 target                           int64
 created                 datetime64[ns]
 dtype: object)

## 3 Features Registry
### 3.1 Declare Features and Entities
Entity defines the primary key(s) associated with one or more feature tables. The entity must be registered before declaring the associated feature tables. 

In [21]:
class features_entities:
    
    def __init__(self,df,entity_col,columns_list,name):
        
        self.df=df
        self.entity_col=entity_col
        self.columns_list=columns_list
        self.feature_register=name
        #self.tags=labels
        
        
    def entity(self):
        if self.df[ self.entity_col].dtype == 'int64':
            driver_id = Entity(name=self.entity_col, description="Driver identifier", value_type=ValueType.INT64,labels={"release": "prod-env", "description": f"This prod feature {self.entity_col} means the unique entity of driver","production": "model"})
            return driver_id
    
    
    def trip_labelss(trip_columns,i):
        labels={
           "tripdata":[
            {"release": "prod-env", "description": f"This prod feature {trip_columns[i]} means the longitude coordinate of where the taxi ride started","production": "model"},
             {"release": "dev-env", "description": f"This dev feature {trip_columns[i]} means the latitude coordinate of where the taxi ride started","production": "model"},
            {"release": "prod-env", "description": f"This prod feature {trip_columns[i]} means longitude coordinate of where the taxi ride ended","production": "train"},
            {"release": "prod-env", "description": f"This prod feature {trip_columns[i]} means latitude coordinate of where the taxi ride ended.","production": "model"},
             {"release": "dev-env", "description": f"This dev feature {trip_columns[i]} means the longitude  distance of the trips in City","production": "train"},
             {"release": "dev-env", "description": f"This dev feature {trip_columns[i]} means the latitude  distance of the trips in City","production": "train"},
            {"release": "prod-env", "description": f"This prod feature {trip_columns[i]} means the total distance covered by a trip","production": "test"},
           ]}
    
        return labels['tripdata'][i]
    
    
    def fare_labelss(trip_columns,i):
        labels={
           "faredata":[
            {"release": "prod-env", "description": f"This prod feature {trip_columns[i]} means the total passenger count","production": "test"},
             {"release": "dev-env", "description": f"This dev feature {trip_columns[i]} means the total fare amount for the trip","production": "entity"},
            {"release": "dev-env", "description": f"This prod feature {trip_columns[i]} means the target profit and loss for the trip","production": "target"},
           
           ]}
    
        return labels['faredata'][i]
    
    
    
    
    def trip_features(self):
       
            features=[]
            retrieve_features=[]
            count=0
            for i in  range(len(self.columns_list)):
            
                if  self.df.dtypes[self.columns_list[i]] == 'int64':
                    #print(trip_labelss(self.columns_list,i))
                    local_features=Feature(f"{self.columns_list[i]}", ValueType.INT64,labels=features_entities.trip_labelss(self.columns_list,i))
                    features.append(local_features)
                    retrieve_features.append(self.feature_register+':'+self.columns_list[i])

                elif  self.df.dtypes[self.columns_list[i]] == 'float64':
                    #print(trip_labelss(self.columns_list,i))
                    local_features=Feature(f"{self.columns_list[i]}", ValueType.DOUBLE,labels=features_entities.trip_labelss(self.columns_list,i))
                    features.append(local_features)
                    retrieve_features.append(self.feature_register+':'+self.columns_list[i],)

                elif  self.df.dtypes[self.columns_list[i]] == 'int32':
                    #print(self.tags['tripdata'][count])
                    local_features=Feature(f"{i}", ValueType.INT32,labels=trip_labelss(self.columns_list,i))
                    features.append(local_features)
                    retrieve_features.append(self.feature_register+':'+self.columns_list[i])

                elif  self.df.dtypes[self.columns_list[i]] == 'object':
                    #print(self.tags['tripdata'][count])
                    local_features=Feature(f"{i}", ValueType.STRING,labels=trip_labelss(self.columns_list,i))
                    features.append(local_features)
                    retrieve_features.append(self.feature_register+':'+self.columns_list[i])

                elif  self.df.dtypes[self.columns_list[i]] == 'bytes':
                    local_features=Feature(f"{i}", ValueType.BYTES,labels=trip_labelss(self.columns_list,i))
                    features.append(local_features)
                    retrieve_features.append(self.feature_register+':'+self.columns_list[i])

                elif  self.df.dtypes[self.columns_list[i]] == 'bool':
                    local_features=Feature(f"{i}", ValueType.BOOL,labels=trip_labelss(self.columns_list,i))
                    features.append(local_features)
                    retrieve_features.append(self.feature_register+':'+self.columns_list[i])
                count+=1
            return features,retrieve_features
    
    
    def fare_features(self):
       
            features=[]
            retrieve_features=[]
            count=0
            for i in  range(len(self.columns_list)):
     
                if  self.df.dtypes[self.columns_list[i]] == 'int64':
                    #print(trip_labelss(self.columns_list,i))
                    local_features=Feature(f"{self.columns_list[i]}", ValueType.INT64,labels=features_entities.fare_labelss(self.columns_list,i))
                    features.append(local_features)
                    retrieve_features.append(self.feature_register+':'+self.columns_list[i])

                elif  self.df.dtypes[self.columns_list[i]] == 'float64':
                    #print(trip_labelss(self.columns_list,i))
                    local_features=Feature(f"{self.columns_list[i]}", ValueType.DOUBLE,labels=features_entities.fare_labelss(self.columns_list,i))
                    features.append(local_features)
                    retrieve_features.append(self.feature_register+':'+self.columns_list[i],)

                elif  self.df.dtypes[self.columns_list[i]] == 'int32':
                    #print(self.tags['tripdata'][count])
                    local_features=Feature(f"{i}", ValueType.INT32,labels=fare_labelss(self.columns_list,i))
                    features.append(local_features)
                    retrieve_features.append(self.feature_register+':'+self.columns_list[i])

                elif  self.df.dtypes[self.columns_list[i]] == 'object':
                    #print(self.tags['tripdata'][count])
                    local_features=Feature(f"{i}", ValueType.STRING,labels=fare_labelss(self.columns_list,i))
                    features.append(local_features)
                    retrieve_features.append(self.feature_register+':'+self.columns_list[i])

                elif  self.df.dtypes[self.columns_list[i]] == 'bytes':
                    local_features=Feature(f"{i}", ValueType.BYTES,labels=fare_labelss(self.columns_list,i))
                    features.append(local_features)
                    retrieve_features.append(self.feature_register+':'+self.columns_list[i])

                elif  self.df.dtypes[self.columns_list[i]] == 'bool':
                    local_features=Feature(f"{i}", ValueType.BOOL,labels=fare_labelss(self.columns_list,i))
                    features.append(local_features)
                    retrieve_features.append(self.feature_register+':'+self.columns_list[i])
                count+=1
            return features,retrieve_features
        
        
        
        
           

    def __str__(self):
        return f'Completed the Feature and Entity Declaration'


>  ### Trip Features

In [22]:
trip_columns=list(trip_details.columns[2:-1])
trip_columns

['pickup_longitude',
 'pickup_latitude',
 'dropoff_longitude',
 'dropoff_latitude',
 'longitude_distance',
 'latitude_distance',
 'distance_travelled']

In [23]:
feature_declaration=features_entities(trip_details,'driver_id',trip_columns,'trip_statistics')

An entity is any domain object that can be modeled and about which information can be stored. Entities are usually recognizable concepts, either concrete or abstract, such as persons, places, things, or events.

Examples of entities in the context of ride-hailing and food delivery: `customer`, `order`, `driver`, `restaurant`, `dish`, `area`.

Feast uses entities in the following way:

* Entities serve as the keys used to look up features for producing training datasets and online feature values.
* Entities serve as a natural grouping of features in a feature table. A feature table must belong to an entity \(which could be a composite entity\)

In [24]:
driver_entity=feature_declaration.entity()

In [25]:
trip_entity,driver_features=feature_declaration.trip_features()

In [26]:
driver_features

['trip_statistics:pickup_longitude',
 'trip_statistics:pickup_latitude',
 'trip_statistics:dropoff_longitude',
 'trip_statistics:dropoff_latitude',
 'trip_statistics:longitude_distance',
 'trip_statistics:latitude_distance',
 'trip_statistics:distance_travelled']

>  ### Fare Features

In [27]:
fare_columns=list(fare_details.columns[2:-1])
fare_columns

['passenger_count', 'fare_amount', 'target']

In [28]:
feature_declaration=features_entities(fare_details,'driver_id',fare_columns,'fare_statistics')

In [29]:
fare_entity,fare_features=feature_declaration.fare_features()
print(feature_declaration)
fare_entity

Completed the Feature and Entity Declaration


[<feast.feature.Feature at 0x7fb8580e9d30>,
 <feast.feature.Feature at 0x7fb8580e9f28>,
 <feast.feature.Feature at 0x7fb8580e9f98>]

In [30]:
fare_features

['fare_statistics:passenger_count',
 'fare_statistics:fare_amount',
 'fare_statistics:target']

### 3.2 Declare  Feature Table Schema

Feature tables are both a schema and a logical means of grouping features, data sources, and other related metadata.

Feature tables serve the following purposes:

- Feature tables are a means for defining the location and properties of data sources.
- Feature tables are used to create within Feast a database-level structure for the storage of feature values.
- The data sources described within feature tables allow Feast to find and ingest feature data into stores within Feast.
- Feature tables ensure data is efficiently stored during ingestion by providing a grouping mechanism of features values that occur on the same event timestamp.

In [31]:
# This is the location we're using for the offline feature store.
demo_data_location = os.path.join(os.getenv('FEAST_STAGING_LOCATION'), "test_data")
demo_data_location

'gs://feast-staging-bucket-9919526/test_data'

In [32]:
trip_source_uri = os.path.join(demo_data_location, "driver_statistics")

trip_statistics = FeatureTable(
    name = "trip_statistics",
    entities = ["driver_id"],
    features = trip_entity,
    batch_source=FileSource(
        event_timestamp_column="pickup_datetime",
        created_timestamp_column="created",
        file_format=ParquetFormat(),
        file_url=trip_source_uri,
        date_partition_column="date"
    )
)

In [33]:
fare_source_uri = os.path.join(demo_data_location, "fare_statistics")


fare_statistics = FeatureTable(
    name = "fare_statistics",
    entities = ["driver_id"],
    features = fare_entity,
    batch_source=FileSource(
        event_timestamp_column="pickup_datetime",
        created_timestamp_column="created",
        file_format=ParquetFormat(),
        file_url=fare_source_uri,
        date_partition_column="date"
    )
)

### 3.3 Registering entities and feature tables in Feature Store 

In [34]:
class register_tables:
    
    def __init__(self,client):
        self.feast_client=client
    
    def register(self):
        self.feast_client.apply(driver_entity)
        self.feast_client.apply(trip_statistics)
        self.feast_client.apply(fare_statistics)
    
    def __str__(self):
        for i in ['trip_statistics','fare_statistics']:
            print(self.feast_client.get_feature_table(i).to_yaml())
        return "Done Registration"
    

In [35]:
table_register=register_tables(client)

In [36]:
table_register.register()

In [37]:
print(table_register)

spec:
  name: trip_statistics
  entities:
  - driver_id
  features:
  - name: latitude_distance
    valueType: DOUBLE
    labels:
      release: dev-env
      description: This dev feature latitude_distance means the latitude  distance
        of the trips in City
      production: train
  - name: longitude_distance
    valueType: DOUBLE
    labels:
      production: train
      description: This dev feature longitude_distance means the longitude  distance
        of the trips in City
      release: dev-env
  - name: dropoff_latitude
    valueType: DOUBLE
    labels:
      description: This prod feature dropoff_latitude means latitude coordinate of
        where the taxi ride ended.
      production: model
      release: prod-env
  - name: pickup_longitude
    valueType: DOUBLE
    labels:
      release: prod-env
      production: model
      description: This prod feature pickup_longitude means the longitude coordinate
        of where the taxi ride started
  - name: dropoff_longitude

### 3.4 Populating batch source

In [38]:
trip_details.head(2)

Unnamed: 0,driver_id,pickup_datetime,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,longitude_distance,latitude_distance,distance_travelled,created
0,610685,2009-06-15 17:26:21+00:00,-73.844311,40.721319,-73.84161,40.712278,0.002701,0.009041,0.009436,2021-06-12 16:09:45.131085
1,825735,2010-01-05 16:52:16+00:00,-74.016048,40.711303,-73.979268,40.782004,0.03678,0.070701,0.079696,2021-06-12 16:09:45.131085


In [39]:
#trip_details=trip_details.iloc[:30,:]

In [40]:
fare_details.head(2)

Unnamed: 0,driver_id,pickup_datetime,passenger_count,fare_amount,target,created
0,610685,2009-06-15 17:26:21+00:00,1,-1.354113,0,2021-06-12 16:09:45.131085
1,825735,2010-01-05 16:52:16+00:00,1,1.088648,1,2021-06-12 16:09:45.131085


In [41]:
#fare_details=fare_details.iloc[:30,:]

In [42]:
client.ingest(trip_statistics, trip_details)

Removing temporary file(s)...
Data has been successfully ingested into FeatureTable batch source.


In [43]:
client.ingest(fare_statistics, fare_details)

Removing temporary file(s)...
Data has been successfully ingested into FeatureTable batch source.


### 3.5 Check the Feature Table stats

In [44]:
client.get_feature_table(name='trip_statistics').to_dict()

{'spec': {'name': 'trip_statistics',
  'entities': ['driver_id'],
  'features': [{'name': 'latitude_distance',
    'valueType': 'DOUBLE',
    'labels': {'release': 'dev-env',
     'description': 'This dev feature latitude_distance means the latitude  distance of the trips in City',
     'production': 'train'}},
   {'name': 'longitude_distance',
    'valueType': 'DOUBLE',
    'labels': {'release': 'dev-env',
     'production': 'train',
     'description': 'This dev feature longitude_distance means the longitude  distance of the trips in City'}},
   {'name': 'dropoff_latitude',
    'valueType': 'DOUBLE',
    'labels': {'production': 'model',
     'release': 'prod-env',
     'description': 'This prod feature dropoff_latitude means latitude coordinate of where the taxi ride ended.'}},
   {'name': 'pickup_longitude',
    'valueType': 'DOUBLE',
    'labels': {'description': 'This prod feature pickup_longitude means the longitude coordinate of where the taxi ride started',
     'release': 'pr

In [45]:
client.get_feature_table(name='fare_statistics').to_dict()

{'spec': {'name': 'fare_statistics',
  'entities': ['driver_id'],
  'features': [{'name': 'passenger_count',
    'valueType': 'INT64',
    'labels': {'production': 'test',
     'description': 'This prod feature passenger_count means the total passenger count',
     'release': 'prod-env'}},
   {'name': 'target',
    'valueType': 'INT64',
    'labels': {'production': 'target',
     'release': 'dev-env',
     'description': 'This prod feature target means the target profit and loss for the trip'}},
   {'name': 'fare_amount',
    'valueType': 'DOUBLE',
    'labels': {'description': 'This dev feature fare_amount means the total fare amount for the trip',
     'release': 'dev-env',
     'production': 'entity'}}],
  'batchSource': {'type': 'BATCH_FILE',
   'eventTimestampColumn': 'pickup_datetime',
   'datePartitionColumn': 'date',
   'createdTimestampColumn': 'created',
   'fileOptions': {'fileFormat': {'parquetFormat': {}},
    'fileUrl': 'gs://feast-staging-bucket-9919526/test_data/fare_st

## 4 Batch Source Online Ingestion

In order to populate the online storage, we can use Feast SDK to start a Spark batch job which will extract the features from the batch source, then load the features to an online store.

The online store maintains only the latest values for a specific feature.

- Feature values are stored based on their entity keys
- Feast currently supports Redis as an online store.
- Online stores are meant for very high throughput writes from ingestion jobs and very low latency access to features during online serving.

In [160]:
job = client.start_offline_to_online_ingestion(
    fare_statistics,
    datetime(2009, 1, 18),
    datetime(2011, 10, 20)
)
job = client.start_offline_to_online_ingestion(
    trip_statistics,
    datetime(2009, 1, 18),
    datetime(2011, 10, 20)
)

##  5 Real Time Streaming Ingestion with Kakfa
With a streaming source, we can use Feast SDK to launch a Spark streaming job that continuously update the online store. First, we will update `driver_trips` feature table such that a new streaming source is added.

In [46]:
import json
import pytz
import io
import kafka
import avro.schema
from avro.io import BinaryEncoder, DatumWriter
from confluent_kafka import Producer
from kafka import KafkaProducer
from confluent_kafka.admin import AdminClient, NewTopic
from kafka import KafkaClient,KafkaAdminClient
from kafka import KafkaConsumer
import avro.schema
import avro.io
import io

### 4.1 Util Kafka Function

In [47]:
class kafkatopic:
    
    def __init__(self,kafka_broker,topic):
        self.KAFKA_BROKER=kafka_broker
        self.topic=topic
    
    def create_topic(self):
        a = AdminClient({'bootstrap.servers':  self.KAFKA_BROKER})

        new_topics = [NewTopic(self.topic, num_partitions=3, replication_factor=1)]
        # Note: In a multi-cluster production scenario, it is more typical to use a replication_factor of 3 for durability.

        # Call create_topics to asynchronously create topics. A dict
        # of <topic,future> is returned.
        fs = a.create_topics(new_topics)

        # Wait for each operation to finish.
        for topic, f in fs.items():
            try:
                f.result()  # The result itself is None
                print("Topic {} created".format(topic))
            except Exception as e:
                print("Failed to create topic {}: {}".format(topic, e))
   
    def list_kafka_topic(self):
        consumer = kafka.KafkaConsumer(bootstrap_servers=[self.KAFKA_BROKER])
        return consumer.topics()
    
   
    def kafka_consumer(self,avro_schema_json):
        # To consume messages
        consumer = KafkaConsumer(bootstrap_servers=[KAFKA_BROKER],auto_offset_reset='earliest')
        consumer.subscribe([self.topic])
        schema = avro.schema.parse(avro_schema_json)

        for msg in consumer:
            bytes_reader = io.BytesIO(msg.value)
            decoder = avro.io.BinaryDecoder(bytes_reader)
            reader = avro.io.DatumReader(schema)
            user = reader.read(decoder)
            print(user)

        
        
    

In [80]:
class send_avro:    
    @staticmethod
    def send_avro_record_to_kafka(topic, record,avro_schema_json):
        value_schema = avro.schema.parse(avro_schema_json)
        writer = DatumWriter(value_schema)
        bytes_writer = io.BytesIO()
        encoder = BinaryEncoder(bytes_writer)
        writer.write(record, encoder)
        producer = KafkaProducer(bootstrap_servers=[KAFKA_BROKER])
        producer.send(topic,value=bytes_writer.getvalue())
        

### 4.1 Kafka Intializer

In [51]:
KAFKA_BROKER=os.getenv('DEMO_KAFKA_BROKERS')
KAFKA_BROKER=KAFKA_BROKER + ":9092"
KAFKA_BROKER

'35.196.152.2:9092'

In [52]:
kafka_intializer=kafkatopic(KAFKA_BROKER,'Trip_Kafka')
kafka_intializer.list_kafka_topic()

set()

### 4.3 Trip Data Kakfa Ingestion

In [53]:
kafka_intializer=kafkatopic(KAFKA_BROKER,'Trip_Kafka')
kafka_intializer.create_topic()

Topic Trip_Kafka created


In [54]:
admin = AdminClient({'bootstrap.servers': KAFKA_BROKER})
fs = admin.list_topics()
fs.topics

{'Trip_Kafka': TopicMetadata(Trip_Kafka, 3 partitions)}

In [126]:
avro_schema_json_trip = json.dumps({
    "type": "record",
    "name": "TripStatistics",
    "fields": [
        {"name": "driver_id", "type": "long"},
        {"name": "latitude_distance", "type": "double"},
        {"name": "longitude_distance", "type": "double"},
         {"name": "pickup_latitude", "type": "double"},
        {"name": "dropoff_longitude", "type": "double"},
        {"name": "pickup_longitude", "type": "double"},
        {"name": "distance_travelled", "type": "double"},
        {"name": "dropoff_latitude", "type": "double"},

        {
            "name": "pickup_datetime",
            "type": {"type": "long", "logicalType": "timestamp-micros"},
        },
    ],
})

In [127]:
trip_statistics.stream_source = KafkaSource(
    event_timestamp_column="pickup_datetime",
    created_timestamp_column="datetime",
    bootstrap_servers=KAFKA_BROKER,
    topic="Trip_Kafka",
    message_format=AvroFormat(avro_schema_json_trip)
)
client.apply(trip_statistics)

In [128]:
job = client.start_stream_to_online_ingestion(
    trip_statistics
)

In [129]:
job.get_status()

<SparkJobStatus.IN_PROGRESS: 1>

In [95]:
def send_avro_record_to_kafka(topic, record,avro_schema_json):
    value_schema = avro.schema.parse(avro_schema_json)
    writer = DatumWriter(value_schema)
    bytes_writer = io.BytesIO()
    encoder = BinaryEncoder(bytes_writer)
    writer.write(record, encoder)
    
    producer = Producer({
        "bootstrap.servers": KAFKA_BROKER,
    })
    producer.produce(topic=topic, value=bytes_writer.getvalue())
    producer.flush()

In [131]:
for record in trip_ingest.iloc[:10000,:].drop(columns=['created']).to_dict('record'):
    record["pickup_datetime"] = (
        record["pickup_datetime"].to_pydatetime().replace(tzinfo=pytz.utc)
    )
    #send_avro.
    send_avro.send_avro_record_to_kafka("Trip_Kafka",record,avro_schema_json_trip)

In [144]:
entities_sample = [{"driver_id": e} for e in trip_ingest.iloc[:100,:]['driver_id'].values.tolist()]
entities_sample=entities_sample[:20]
entities_sample

[{'driver_id': 610685},
 {'driver_id': 825735},
 {'driver_id': 428317},
 {'driver_id': 356886},
 {'driver_id': 603801},
 {'driver_id': 183971},
 {'driver_id': 600461},
 {'driver_id': 596197},
 {'driver_id': 382017},
 {'driver_id': 599864},
 {'driver_id': 486440},
 {'driver_id': 60412},
 {'driver_id': 318925},
 {'driver_id': 942474},
 {'driver_id': 111143},
 {'driver_id': 991464},
 {'driver_id': 239703},
 {'driver_id': 580070},
 {'driver_id': 640316},
 {'driver_id': 365485}]

In [145]:
driver_features

['trip_statistics:pickup_longitude',
 'trip_statistics:pickup_latitude',
 'trip_statistics:dropoff_longitude',
 'trip_statistics:dropoff_latitude',
 'trip_statistics:longitude_distance',
 'trip_statistics:latitude_distance',
 'trip_statistics:distance_travelled']

In [148]:
features = client.get_online_features(
    feature_refs=driver_features,
    entity_rows=entities_sample).to_dict()

In [149]:
trip_kafka_data=pd.DataFrame(features)
trip_kafka_data.head()

Unnamed: 0,trip_statistics:dropoff_longitude,trip_statistics:dropoff_latitude,trip_statistics:distance_travelled,driver_id,trip_statistics:pickup_latitude,trip_statistics:latitude_distance,trip_statistics:longitude_distance,trip_statistics:pickup_longitude
0,-73.84161,40.712278,0.009436,610685,40.721319,0.009041,0.002701,-73.844311
1,-73.979268,40.782004,0.079696,825735,40.711303,0.070701,0.03678,-74.016048
2,-73.991242,40.750562,0.013674,428317,40.76127,0.010708,0.008504,-73.982738
3,-73.991567,40.758092,0.02534,356886,40.733143,0.024949,0.004437,-73.98713
4,-73.956655,40.783762,0.01947,603801,40.768008,0.015754,0.01144,-73.968095


In [163]:
# This will stop the streaming job
job.cancel()


### 5.4 Fare Data kafka Ingestion


In [106]:
kafka_intializer=kafkatopic(KAFKA_BROKER,'Fare_Kafka')
kafka_intializer.create_topic()

Failed to create topic Fare_Kafka: KafkaError{code=TOPIC_ALREADY_EXISTS,val=36,str="Topic 'Fare_Kafka' already exists."}


In [107]:
admin = AdminClient({'bootstrap.servers': KAFKA_BROKER})
fs = admin.list_topics()
fs.topics


{'Trip_Kafka': TopicMetadata(Trip_Kafka, 3 partitions),
 'Trip_Stats': TopicMetadata(Trip_Stats, 1 partitions),
 'Fare_Kafka': TopicMetadata(Fare_Kafka, 3 partitions),
 '__consumer_offsets': TopicMetadata(__consumer_offsets, 50 partitions)}

In [108]:
avro_schema_json_fare = json.dumps({
    "type": "record",
    "name": "FareStatistics",
    "fields": [
        {"name": "driver_id", "type": "long"},
        {"name": "passenger_count", "type": "long"},
        {"name": "target", "type": "long"},
         {"name": "fare_amount", "type": "double"},
     
        {
            "name": "pickup_datetime",
            "type": {"type": "long", "logicalType": "timestamp-micros"},
        },
    ],
})



In [109]:
fare_statistics.stream_source = KafkaSource(
    event_timestamp_column="pickup_datetime",
    created_timestamp_column="datetime",
    bootstrap_servers=KAFKA_BROKER,
    topic="Fare_Kafka",
    message_format=AvroFormat(avro_schema_json_fare)
)
client.apply(fare_statistics)


In [110]:
job = client.start_stream_to_online_ingestion(
    fare_statistics
)

In [111]:
job.get_status()

<SparkJobStatus.IN_PROGRESS: 1>

In [116]:
for record in fare_details.iloc[:50,:].drop(columns=['created']).to_dict('record'):
    record["pickup_datetime"] = (
        record["pickup_datetime"].to_pydatetime().replace(tzinfo=pytz.utc)
    )
    #print(record)
    send_avro_record_to_kafka("Fare_Kafka",record, avro_schema_json_fare)


In [117]:
entities_sample = [{"driver_id": e} for e in fare_details.iloc[:10,:]['driver_id'].values.tolist()]
entities_sample=entities_sample[:20]
entities_sample



[{'driver_id': 610685},
 {'driver_id': 825735},
 {'driver_id': 428317},
 {'driver_id': 356886},
 {'driver_id': 603801},
 {'driver_id': 183971},
 {'driver_id': 600461},
 {'driver_id': 596197},
 {'driver_id': 382017},
 {'driver_id': 599864}]

In [150]:
features = client.get_online_features(
    feature_refs=fare_features,
    entity_rows=entities_sample).to_dict()

In [151]:
fare_kafka_data=pd.DataFrame(features)
fare_kafka_data.head()

Unnamed: 0,fare_statistics:target,fare_statistics:passenger_count,fare_statistics:fare_amount,driver_id
0,0,1,-1.354113,610685
1,1,1,1.088648,825735
2,0,2,-0.813646,428317
3,0,1,-0.191734,356886
4,0,1,-0.975267,603801


In [162]:
# This will stop the streaming job
job.cancel()


## 6 Kubeflow Pipeline Artifacts for Model Training 

**Entity key**

The combination of entities that uniquely identify a row. For example a feature table with the composite entity of \(customer, country\) might have an entity key of \(1001, 5\). They key is used during lookups of feature values and for deduplicating historical rows.

**Entity timestamp**

The timestamp on which an event occurred. The entity timestamp could describe the event time at which features were calculated, or it could describe the event timestamps at which outcomes were observed.

Entity timestamps are commonly found on the entity dataframe and associated with the target variable \(outcome\) that needs to be predicted. These timestamps are the target on which point-in-time joins should be made.

In [83]:
import gcsfs
from pyarrow.parquet import ParquetDataset
from urllib.parse import urlparse

In [84]:
def read_parquet(uri):
    parsed_uri = urlparse(uri)
    if parsed_uri.scheme == "file":
        return pd.read_parquet(parsed_uri.path)
    elif parsed_uri.scheme == "gs":
        fs = gcsfs.GCSFileSystem()
        files = ["gs://" + path for path in fs.glob(uri + '/part-*')]
        ds = ParquetDataset(files, filesystem=fs)
        return ds.read().to_pandas()
    elif parsed_uri.scheme == 's3':
        import s3fs
        fs = s3fs.S3FileSystem()
        files = ["s3://" + path for path in fs.glob(uri + '/part-*')]
        ds = ParquetDataset(files, filesystem=fs)
        return ds.read().to_pandas()
    elif parsed_uri.scheme == 'wasbs':
        import adlfs
        fs = adlfs.AzureBlobFileSystem(
            account_name=os.getenv('FEAST_AZURE_BLOB_ACCOUNT_NAME'), account_key=os.getenv('FEAST_AZURE_BLOB_ACCOUNT_ACCESS_KEY')
        )
        uripath = parsed_uri.username + parsed_uri.path
        files = fs.glob(uripath + '/part-*')
        ds = ParquetDataset(files, filesystem=fs)
        return ds.read().to_pandas()
    else:
        raise ValueError(f"Unsupported URL scheme {uri}")

In [152]:
entities_with_timestamp = pd.DataFrame(columns=['driver_id', 'event_timestamp'])
entities_with_timestamp['driver_id'] = trip_details['driver_id']
entities_with_timestamp['event_timestamp'] = pd.to_datetime(np.random.randint(
    datetime(2020, 10, 18).timestamp(),
    datetime(2020, 10, 20).timestamp(),
    size=trip_details.shape[0]), unit='s')
entities_with_timestamp.head()

Unnamed: 0,driver_id,event_timestamp
0,610685,2020-10-18 08:24:32
1,825735,2020-10-18 00:14:50
2,428317,2020-10-19 19:25:12
3,356886,2020-10-19 21:55:33
4,603801,2020-10-19 13:31:24


In [154]:
entities_with_timestamp.to_csv('entity.csv',index=False)
entities_with_timestamp.to_csv("gs://feastproject/driver_id.csv",index=False)

In [155]:
Master_Features= fare_features + driver_features
Master_Features

['fare_statistics:passenger_count',
 'fare_statistics:fare_amount',
 'fare_statistics:target',
 'trip_statistics:pickup_longitude',
 'trip_statistics:pickup_latitude',
 'trip_statistics:dropoff_longitude',
 'trip_statistics:dropoff_latitude',
 'trip_statistics:longitude_distance',
 'trip_statistics:latitude_distance',
 'trip_statistics:distance_travelled']

In [156]:
with open("features.json", "w") as output:
    json.dump(Master_Features, output)