In [1]:
# Importing required libraries 

import json
from time import sleep
import os
import pandas as pd
from kafka import KafkaAdminClient, KafkaProducer, KafkaConsumer
from kafka.admin import NewTopic
import argparse
import feast
import subprocess
from datetime import datetime, timedelta
import pandas as pd
from feast import FeatureStore
from feast.data_source import PushMode
from pandas.tseries.holiday import USFederalHolidayCalendar

In [2]:
os.chdir('./feature_repo')  # Change to the directory where the feature repo is located
print(f"Current directory: {os.getcwd()}")

Current directory: /home/jovyan/work/workspace/feature_repo


# What is Feast?

Feast (Feature Store) is an open-source feature store designed for machine learning operations. It serves as a centralized repository for managing, storing, and serving machine learning features in both offline (batch) and online (real-time) environments.

## Why Feature Stores Matter

The feature store acts as a bridge between data engineering and machine learning workflows, addressing key challenges in the ML lifecycle:

- **Consistency between training and serving**: Ensures the same feature values are used for both model training and inference.
- **Feature reuse**: Allows teams to share and discover features across projects.

## Key Components

### 1. Data Sources
- **Batch Source**: Historical data used for training models, typically stored in files (like Parquet) or databases.
- **Stream Source**: Real-time data, often coming from event streams or message queues.

### 2. Data Storage
- **Offline Data Store**: is an interface for working with historical time-series feature values that are stored in data sources. The OfflineStore interface has several different implementations, such as the BigQueryOfflineStore, each of which is backed by a different storage and compute engine.
- **Online Data Store**: Stores the latest feature values for real-time serving, typically in low-latency databases like Redis.

### 3. Registry
The Registry is a database that:
- Stores all feature definitions, data sources, and entity relationships (More about this next)
- Acts as the source of truth for the entire feature store infrastructure
- Tracks feature metadata including descriptions, owners, and data types

In its simplest form, it is stored as a file.

### 4. Consumers (Data Scientists / ML Engineers)
- **Model Training**: Uses historical feature values from the offline store.
- **Inference**: Uses real-time feature values from the online store.

## Before we proceed, take a look at the data that we will be using for demonstrating Feast
The dataset contains flight information for September 2024, with each row representing a single flight. Key features include:

    Flight identifiers: flight_ID (airline code and flight number), origin/destination airports
    Schedule information: FlightDate, Distance, CRSElapsedTime (scheduled duration)
    Temporal features: DayOfWeek, Month, Quarter
    Delay metrics: DepDelay (departure delay), ArrDelay (arrival delay), and categorized delays like WeatherDelay and NASDelay
    Computed features: is_holiday, days_to_nearest_holiday, Route (combined origin-destination), and route-based delay statistics

The computed features are important in the context of stream transformations. We will take a look at these later on.

This data is ideal for our flight delay prediction service as it contains both historical performance metrics and contextual features that can help predict future delays.

In [3]:
df = pd.read_parquet('./data/flights_v1.parquet')
print(df.head)

<bound method NDFrame.head of        flight_ID FlightDate Origin Dest  Distance  CRSElapsedTime  DayOfWeek  \
0        WN_3609 2024-09-01    ABQ  AUS     619.0           105.0          7   
402849    AS_500 2024-09-01    SEA  IND    1866.0           247.0          7   
402850    AS_502 2024-09-01    SEA  AUS    1770.0           252.0          7   
402851    AS_505 2024-09-01    AUS  SEA    1770.0           269.0          7   
402852    AS_508 2024-09-01    SEA  CVG    1965.0           265.0          7   
...          ...        ...    ...  ...       ...             ...        ...   
470381   DL_2851 2024-09-30    SEA  DEN    1024.0           154.0          1   
228957   UA_1521 2024-09-30    IAD  MSY     955.0           163.0          1   
228958   UA_1520 2024-09-30    MCO  DEN    1546.0           237.0          1   
228954   UA_1523 2024-09-30    IAH  PDX    1825.0           269.0          1   
229208   UA_1276 2024-09-30    IAH  EWR    1400.0           215.0          1   

        M

# Data Sources in Feast

Data sources are foundational components in Feast that define where and how feature data is obtained. They serve as the connection points between your raw data and the feature store infrastructure.


## Types of Data Sources

### 1. FileSource
FileSource is used for batch processing scenarios and reads data from files like Parquet, CSV, or other file formats. This is particularly convenient for local development or when working with historical data stored in files.

### 2. PushSource
PushSource allows feature values to be pushed to the online store and offline store in real-time, making fresh feature values immediately available to applications. This approach enables you to update your feature values on-demand without needing to run a complete batch ingestion cycle.

### 3. KafkaSource / Streaming Sources [Still in Alpha]
Kafka source connects to Kafka topics for real-time data streaming. It requires a batch source to be specified, which can be used for retrieving historical features. This type of source is essential for online feature serving with real-time updates.



# Our Data Source Setup

In our project, we use two complementary data source types in our feature store implementation. You can examine the detailed setup in the `data_sources.py` file located in the `workspace/feature_repo` folder.

## File Source Configuration

Our primary data source is configured to read flight data from a Parquet file:

```python
flight_stats_source = FileSource(
    path="data/flights_v1.parquet", 
    timestamp_field="FlightDate",
    file_format=ParquetFormat()
)
```

This configuration uses a local file path for development. The `timestamp_field` parameter indicates that the "FlightDate" column contains the event timestamps, which is crucial for Feast to maintain point-in-time correctness when retrieving features.

## Alternative S3 Configuration (Commented Out) 

We also have a commented-out S3 configuration that demonstrates how to connect to data stored in MinIO (an S3-compatible object storage):

```python
bucket_name = "feast-bucket"
file_name = "flights_v1.parquet"
s3_endpoint = "https://localhost:9000" 

flight_stats_source = FileSource(
    path=f"s3://{bucket_name}/{file_name}",  
    timestamp_field="FlightDate",
    file_format=ParquetFormat(),
    s3_endpoint_override="http://localhost:9000"
)
```

This configuration is currently commented out due to an ongoing bug with S3 endpoint handling in Feast. This will work once the fix is released (https://github.com/feast-dev/feast/pull/5208)

## Push Source for Real-time Updates

To enable real-time updates to our feature store, we've configured a push source:

```python
flight_stats_push_source = PushSource(
    name="flight_stats_push_source",
    batch_source=flight_stats_source,
)
```

This push source:
- References our batch source (flight_stats_source) to maintain schema consistency
- Allows us to programmatically push new feature values using `store.push()` API calls
- Enables real-time feature updates without waiting for batch processing cycles

# Entities in Feast

## What are Entities?

Entities in Feast are the objects around which features are organized. They serve as primary keys that uniquely identify records in your feature data.
You can take a look at the `entities.py` file. It is in same folder of our feast artifact.

## Flight Entity

```python
from feast import Entity

# Define an entity for the flight
flight = Entity(
    name="flight",
    join_keys=["flight_ID"],
    description="Flight identifier"
)
```

In our flight delay prediction service, this entity definition:
- Names "flight" as our primary object
- Uses "flight_ID" as the join key for connecting data across sources
- Provides a clear description of its purpose

This entity will allow Feast to organize flight-related features (like aircraft data, weather conditions, and historical delays) and enable accurate feature retrieval for both model training and real-time prediction.

# Feature Views in Feast

## What are Feature Views?

Feature views are a core component in Feast that define a collection of features and their data source. They specify the schema, time-to-live (TTL), and storage locations for features, connecting entity definitions with data sources. You can examine the detailed setup in the `features.py` file located in the workspace/feature_repo folder.

## Our Flight Delay Prediction Feature Views

Our setup includes two feature views for flight delay prediction:

### 1. Historical Flight Statistics

```python
flight_stats_fv = FeatureView(
    name="flight_stats",
    entities=[flight],
    ttl=timedelta(days=30),
    schema=[
        # Schema fields...
    ],
    online=True,
    source=flight_stats_source,
    tags={"team": "flight_ops"},
)
```

This feature view:
- Links to our `flight` entity
- Sets a 30-day TTL for feature freshness
- Defines a schema with flight attributes, delay metrics, and route information
- Enables online serving (`online=True`)
- Uses our batch file source for historical data
- Includes team attribution tags

### 2. Real-time Flight Statistics

```python
flight_stats_fresh_fv = FeatureView(
    name="flight_stats_fresh",
    entities=[flight],
    ttl=timedelta(days=30),
    schema=[
        # Real-time relevant fields...
    ],
    online=True,
    source=flight_stats_push_source,
    tags={"team": "flight_ops"},
)
```

This feature view:
- Connects to the same `flight` entity
- Focuses on real-time features relevant for prediction
- Uses our push source to enable real-time updates
- Maintains the same TTL and online serving capabilities

These feature views provide both historical data for model training and real-time data for online prediction, addressing the core requirements of our flight delay prediction service.

# Feature Services in Feast

## What are Feature Services?

Feature Services are Feast components that allow you to group related features from one or more feature views into a logical unit. They provide a way to version your feature sets and create specific combinations of features for different model versions or use cases. You can examine the detailed setup in the `feature_services.py` file located in the `workspace/feature_repo` folder.

## Our Flight Delay Prediction Feature Services

Our project defines three feature services for different prediction scenarios:

### 1. Basic Prediction (v1)

```python
flight_prediction_v1 = FeatureService(
    name="flight_prediction_v1",
    features=[
        flight_stats_fv[["Distance", "CRSElapsedTime", "DayOfWeek", "Month"]],
    ],
)
```

This service:
- Provides a minimal set of basic features for flight delay prediction
- Selects only essential features from the main feature view
- Suitable for baseline models or low-latency prediction requirements

### 2. Advanced Prediction (v2)

```python
flight_prediction_v2 = FeatureService(
    name="flight_prediction_v2",
    features=[
        flight_stats_fv,
    ],
)
```

This service:
- Includes all features from the historical flight statistics feature view
- Provides the full feature set for comprehensive model training
- Enables more sophisticated prediction models

### 3. Real-time Prediction (v3)

```python
flight_prediction_v3 = FeatureService(
    name="flight_prediction_v3",
    features=[flight_stats_fresh_fv],
)
```

This service:
- Uses only the real-time feature view with push source
- Optimized for online serving with the most current data
- Includes windowed features for real-time prediction scenarios

By organizing our features into these services, we can easily manage different model versions, control which features are used in different scenarios, and maintain consistency across training and serving environments.

# Setting Up the Infrastructure for Feast

Before we can apply our feature definitions and start using Feast, we need to set up the required infrastructure for the offline, online store and registry. In our setup, we're using Redis for the online store (fast feature serving), minio for the offline store and PostgreSQL for the feature registry. 

You can start these services using Docker Compose with the following command in the SSH terminal:

```bash
docker-compose -f /home/cc/feast-artifact/docker/docker-compose-feast.yaml up redis registry minio minio-init
```

# Feast Apply

`feast apply` is a command that deploys your feature definitions to a Feast feature store by:

1. Scanning your repository for entity, project, feature view and service definitions [It will search all the .py files in the present working directory]
2. Validating configurations 
3. Updating the registry metadata

This command bridges your code definitions with the actual infrastructure, making your features available for both historical retrieval and online serving. For our flight delay service, running `feast apply` registers all components (entity, feature views, services) in the system.



# Understanding feature_store.yaml in Feast

The `feature_store.yaml` file is the central configuration file for your Feast deployment. It defines how your feature store connects to various infrastructure components and how it should behave. Let's break down each component in your configuration:

```yaml
project: flight_delay_project
provider: local
registry:
  registry_type: sql
  path: postgresql://postgres:mysecretpassword@registry:5432/feast
online_store:
  type: redis
  connection_string: redis:6379
offline_store:
  type: file
entity_key_serialization_version: 2
```

## Key Components

- **project**: Defines the namespace for your feature store, serving as a logical grouping for all your feature definitions
  
- **provider**: Specifies the infrastructure provider (in this case "local" for development environments)
  
- **registry**: Configures how and where feature definitions are stored
  - `registry_type: sql` - Uses a SQL database to store metadata
  - `path` - Connection string for the PostgreSQL database

- **online_store**: Defines the database for serving real-time feature values
  - `type: redis` - Uses Redis for low-latency feature serving
  - `connection_string` - How to connect to the Redis instance

- **offline_store**: Specifies where historical feature values are stored
  - `type: file` - Uses local files (like Parquet) for offline storage

- **entity_key_serialization_version**: Controls how entity keys are serialized in the online store

This configuration file is the first thing Feast looks for when you run commands like `feast apply` or when initializing a `FeatureStore` object. It establishes the connection between your code definitions and the actual infrastructure where your feature data will be stored and served.

!! Docker Networking Note : We use container names (registry, redis) instead of localhost because services in Docker Compose networks can reference each other by container name and using localhost would make services connect to themselves, not to other services

In [4]:
# Please run the following cell to apply all the configurations that we've described in the above python files
!feast apply

  pid, fd = os.forkpty()


  flight = Entity(
No project found in the repository. Using project name flight_delay_project defined in feature_store.yaml
Applying changes for project flight_delay_project
Deploying infrastructure for [1m[32mflight_stats_fresh[0m
Deploying infrastructure for [1m[32mflight_stats[0m


In [5]:
# Configuration present in feature_store.yaml
!feast configuration

project: flight_delay_project
provider: local
registry:
  registry_type: sql
  path: postgresql+psycopg2://postgres:mysecretpassword@registry:5432/feast
online_store:
  type: redis
  connection_string: redis:6379
auth:
  type: no_auth
offline_store:
  type: file
batch_engine: local
entity_key_serialization_version: 2



The `FeatureStore(repo_path=".")` initializes a Feast feature store instance that connects to the feature repository in the current directory, providing access to all registered entities, features, and data sources.

In [6]:
# init feature store object
store = FeatureStore(repo_path=".")

# Patterns of feature retreival 

Feast supports several patterns of feature retrieval:

1. Training data generation (via feature_store.get_historical_features(...))

2. Offline feature retrieval for batch scoring (via feature_store.get_historical_features(...))

3. Online feature retrieval for real-time model predictions (via feature_store.get_online_feature(...))

In this tutorial, we will focus on training data generation and online feature retrieval

# Historical Feature Retrieval in Feast
Historical feature retrieval is a critical component of the machine learning lifecycle that allows you to create point-in-time correct training datasets by joining features from your feature store with entity data. Let's explore how this works in Feast and why it's important for model training.

## What is Historical Feature Retrieval?
Historical feature retrieval in Feast allows you to:

- Access historical feature values for a specific set of entities at precise timestamps
- Create training datasets with point-in-time correct feature values
- Prevent feature leakage by ensuring future data isn't used during training
- Join features from multiple feature views into a single cohesive dataset

Let's explore it using `fetch_historical_features_for_model_training` function below :

In [8]:
def fetch_historical_features_for_model_training(
    store: FeatureStore, 
    feature_service_name: str = None,
    start_date: datetime = None,
    end_date: datetime = None,
    flight_ids: list = None,
    date_frequency: str = 'D' 
):
    """
    Retrieve historical features for machine learning model training with point-in-time correctness.
    
    Parameters:
    -----------
    store : FeatureStore
        The initialized Feast feature store
    feature_service_name : str, optional
        Name of the feature service to use for feature selection
    start_date : datetime
        Start date for the time range to retrieve features
    end_date : datetime
        End date for the time range to retrieve features
    flight_ids : list, optional
        List of flight_IDs to retrieve features for. If None, will use default example IDs
    date_frequency : str, default='D'
        Frequency for date range generation (D=daily, W=weekly, etc.)
    
    Returns:
    --------
    pandas.DataFrame
        DataFrame containing entity keys, timestamps, and corresponding feature values
    """
    # Set defaults if not provided
    if start_date is None:
        start_date = datetime(2024, 9, 1)
    if end_date is None:
        end_date = datetime(2024, 9, 30)
    if flight_ids is None:
        flight_ids = ["WN_3609", "WN_3610", "WN_3611"]
    
    # Generate a range of dates between start_date and end_date
    date_range = pd.date_range(start=start_date, end=end_date, freq=date_frequency)
    
    # Create entity dataframe with all combinations of flight_IDs and dates
    entity_records = []
    for flight_id in flight_ids:
        for timestamp in date_range:
            entity_records.append({
                "flight_ID": flight_id,
                "event_timestamp": timestamp
            })
    
    entity_df = pd.DataFrame(entity_records)
    
    # Determine features to retrieve - either through feature service or direct references
    if feature_service_name:
        features_to_retrieve = store.get_feature_service(feature_service_name)
        print(f"Using feature service '{feature_service_name}' for model training")
    else:
        # Default set of features if no feature service specified
        features_to_retrieve = [
            "flight_stats:Distance",
            "flight_stats:CRSElapsedTime",
            "flight_stats:DayOfWeek",
            "flight_stats:Month",
            "flight_stats:WeatherDelay",
            "flight_stats:NASDelay",
            "flight_stats:is_holiday",
            "flight_stats:days_to_nearest_holiday",
            "flight_stats:route_avg_delay_24h",
        ]
        print(f"Using {len(features_to_retrieve)} individual feature references")
    
    # Execute point-in-time join to retrieve historical features
    print(f"Retrieving historical features for {len(flight_ids)} flights across {len(date_range)} timestamps")
    historical_features = store.get_historical_features(
        entity_df=entity_df,
        features=features_to_retrieve
    )
    
    # Convert to DataFrame for model training
    training_df = historical_features.to_df()
    
    print(f"Successfully generated training dataset with {len(training_df)} rows and {len(training_df.columns)} columns")
    
    # Can split into features and labels if needed for model training
    # X = training_df.drop(['label_column'], axis=1) if 'label_column' exists
    # y = training_df['label_column'] if 'label_column' exists
    
    return training_df

# Historical Feature Retrieval for ML Model Training

## Understanding Feast's Historical Feature Pipeline

Historical feature retrieval is essential for building accurate, leakage-free machine learning models. Our implementation leverages Feast's powerful point-in-time joining capabilities to create production-ready training datasets.

### Flexible Feature Selection

Data scientists need to experiment with different feature combinations to optimize model performance. Our implementation supports:

- **Feature Service Selection**: Use predefined feature groups via `store.get_feature_service()`
- **Direct Feature References**: Specify individual features for more granular control
- **Mixed Approach**: Combine feature services with additional individual features

This flexibility accelerates the experimentation cycle while maintaining reproducibility.

### Dataset Generation

The output of `get_historical_features().to_df()` is immediately usable for model training

###  ML Workflow Integration

The resulting DataFrame integrates directly into standard ML pipelines:

```python
# Get training data with point-in-time correct features
training_df = fetch_historical_features_for_model_training(
    store,
    feature_service_name="flight_prediction_v2"
)

# Split into features and target
X = training_df.drop(['ArrDelay'], axis=1)  # Features
y = training_df['ArrDelay']                 # Prediction target

# Standard scikit-learn workflow
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
model = RandomForestRegressor()
model.fit(X_train, y_train)
```

This implementation bridges the gap between feature engineering and model development, allowing data scientists to focus on improving model performance while ensuring their training data maintains production-grade integrity.

In [10]:
basic_training_df = fetch_historical_features_for_model_training(store)

# For advanced model with a specific feature service
advanced_training_df = fetch_historical_features_for_model_training(
    store,
    feature_service_name="flight_prediction_v2", 
    start_date=datetime(2024, 9, 1),
    end_date=datetime(2024, 9, 30),
    flight_ids=["DL_2851", "AA_1200", "WN_3609"]
)

print(basic_training_df.head)

print(advanced_training_df.head)

Using 9 individual feature references
Retrieving historical features for 3 flights across 30 timestamps


+----------------------------+------------+-------------+
| Merge columns              | left dtype | right dtype |
+----------------------------+------------+-------------+
| ('flight_ID', 'flight_ID') | object     | string      |
+----------------------------+------------+-------------+
Cast dtypes explicitly to avoid unexpected results.


Successfully generated training dataset with 89 rows and 11 columns
Using feature service 'flight_prediction_v2' for model training
Retrieving historical features for 3 flights across 30 timestamps


+----------------------------+------------+-------------+
| Merge columns              | left dtype | right dtype |
+----------------------------+------------+-------------+
| ('flight_ID', 'flight_ID') | object     | string      |
+----------------------------+------------+-------------+
Cast dtypes explicitly to avoid unexpected results.


Successfully generated training dataset with 90 rows and 21 columns
<bound method NDFrame.head of    flight_ID           event_timestamp  Distance  CRSElapsedTime  DayOfWeek  \
0    WN_3609 2024-09-01 00:00:00+00:00     619.0           105.0          7   
1    WN_3610 2024-09-01 00:00:00+00:00    1670.0           250.0          7   
2    WN_3610 2024-09-02 00:00:00+00:00     787.0           145.0          1   
3    WN_3609 2024-09-02 00:00:00+00:00     888.0           155.0          1   
4    WN_3609 2024-09-03 00:00:00+00:00     986.0           160.0          1   
..       ...                       ...       ...             ...        ...   
84   WN_3609 2024-09-29 00:00:00+00:00     392.0            80.0          7   
85   WN_3610 2024-09-29 00:00:00+00:00     787.0           145.0          7   
86   WN_3609 2024-09-30 00:00:00+00:00     986.0           160.0          1   
87   WN_3610 2024-09-30 00:00:00+00:00     787.0           145.0          1   
88   WN_3611 2024-09-30 00:00:00+

# Online Feature Retrieval with Feast

## The `get_online_features()` Method

Feast's `get_online_features()` method is designed for real-time feature retrieval by:
1. Accepting entity rows (the keys we want to look up)
2. Retrieving feature values from the online store 
3. Returning the values in a format ready for model inference

## Our Implementation

The function below demonstrates retrieving online features for specific flights in three different ways:
1. Basic features directly from feature view
2. Feature subsets via feature service (v1) for basic prediction using our file source
3. All features via feature service (v2) for advanced prediction using our file source
4. Real-time features via feature service (v3) for the freshest data using our push source

This flexibility allows us to use the same underlying data in different prediction scenarios based on latency requirements and model complexity.

In [28]:
def fetch_online_features(store, source: str = ""):
    entity_rows = [
        {
            "flight_ID": "OO_3757",
        },
        {
            "flight_ID": "AA_1200",
        },
        {
            "flight_ID": "DL_2851",
        }
    ]

    if source == "feature_service":
        features_to_fetch = store.get_feature_service("flight_prediction_v1")
    elif source == "advanced_feature_service":
        features_to_fetch = store.get_feature_service("flight_prediction_v2")
    elif source == "real_time_feature_service":
        features_to_fetch = store.get_feature_service("flight_prediction_v3")
    else:
        features_to_fetch = [
            "flight_stats:Distance",
            "flight_stats:WeatherDelay",
        ]

    returned_features = store.get_online_features(
        features=features_to_fetch,
        entity_rows=entity_rows,
    ).to_dict()

    for key, value in sorted(returned_features.items()):
        print(key, " : ", value)

# Materializing Features in Feast

## What is Materialization?

Materialization is the process of transferring feature values from offline storage (data source) to the online store (Redis in our case) to make them available for low-latency serving. This is a critical step that bridges the gap between your historical data and real-time feature access.

## How Materialization Works

The `materialize()` method:
- Takes a time range (start_date to end_date)
- Reads historical feature values from the data source for that period
- Writes the latest values for each entity to the online store
- Makes these features immediately available for online retrieval

In our example, we're materializing all flight features for September 2024, ensuring that when we call `get_online_features()`, we'll have the most recent feature values for each flight entity ready for real-time prediction.

This step is necessary before you can retrieve features with `get_online_features()` since it populates the online store with the data needed for low-latency lookups. Think of materialization as a scheduled ETL job that keeps your online feature store in sync with your offline data.

In [23]:
start_date = datetime.strptime('2024-09-01', '%Y-%m-%d')
end_date = datetime.strptime('2024-09-30', '%Y-%m-%d')

store.materialize(start_date=start_date, end_date=end_date)

Materializing [1m[32m2[0m feature views from [1m[32m2024-09-01 00:00:00+00:00[0m to [1m[32m2024-09-30 00:00:00+00:00[0m into the [1m[32mredis[0m online store.

[1m[32mflight_stats_fresh[0m:


100%|███████████████████████████████████████████████████████| 21956/21956 [00:02<00:00, 9062.44it/s]


[1m[32mflight_stats[0m:


100%|███████████████████████████████████████████████████████| 21956/21956 [00:02<00:00, 7453.75it/s]


In [29]:
print("\n--- Online features retrieved through feature service v1 ---")
fetch_online_features(store, source="feature_service")


--- Online features retrieved through feature service v1 ---
CRSElapsedTime  :  [129.0, 169.0, 154.0]
DayOfWeek  :  [7, 7, 7]
Distance  :  [604.0, 1013.0, 1024.0]
Month  :  [9, 9, 9]
flight_ID  :  ['OO_3757', 'AA_1200', 'DL_2851']


In [30]:
print("\n--- Online features retrieved through feature service v2 ---")
fetch_online_features(store, source="advanced_feature_service")


--- Online features retrieved through feature service v2 ---
ArrDelay  :  [241.0, -7.0, -25.0]
CRSElapsedTime  :  [129.0, 169.0, 154.0]
CarrierDelay  :  [0.0, None, None]
DayOfWeek  :  [7, 7, 7]
DepDelay  :  [259.0, 5.0, -8.0]
Dest  :  ['MSP', 'PIT', 'DEN']
Distance  :  [604.0, 1013.0, 1024.0]
LateAircraftDelay  :  [0.0, None, None]
Month  :  [9, 9, 9]
NASDelay  :  [0.0, None, None]
Origin  :  ['SDF', 'MIA', 'SEA']
Quarter  :  [3, 3, 3]
Route  :  ['SDF_MSP', 'MIA_PIT', 'SEA_DEN']
SecurityDelay  :  [0.0, None, None]
WeatherDelay  :  [241.0, None, None]
days_to_nearest_holiday  :  [15, 15, 15]
flight_ID  :  ['OO_3757', 'AA_1200', 'DL_2851']
is_holiday  :  [0, 0, 0]
route_avg_delay_24h  :  [41.0, -5.0, 12.210526466369629]
route_max_delay_24h  :  [80.0, -5.0, 104.0]


In [31]:
print("\n--- Online features retrieved through feature service v3 ---")
fetch_online_features(store, source="real_time_feature_service")


--- Online features retrieved through feature service v3 ---
CRSElapsedTime  :  [129.0, 169.0, 154.0]
DayOfWeek  :  [7, 7, 7]
DepDelay  :  [259.0, 5.0, -8.0]
Dest  :  ['MSP', 'PIT', 'DEN']
Distance  :  [604.0, 1013.0, 1024.0]
Month  :  [9, 9, 9]
NASDelay  :  [0.0, None, None]
Origin  :  ['SDF', 'MIA', 'SEA']
Quarter  :  [3, 3, 3]
Route  :  ['SDF_MSP', 'MIA_PIT', 'SEA_DEN']
WeatherDelay  :  [241.0, None, None]
days_to_nearest_holiday  :  [15, 15, 15]
flight_ID  :  ['OO_3757', 'AA_1200', 'DL_2851']
is_holiday  :  [0, 0, 0]
route_avg_delay_24h  :  [41.0, -5.0, 12.210526466369629]
route_max_delay_24h  :  [80.0, -5.0, 104.0]


# Real-Time Feature Updates with Push Source

Feast's Push Source enables real-time updates to feature values without waiting for batch ingestion cycles. This capability is critical for ML applications where prediction accuracy depends on the most current data to avoid training-test skew.

In the code below, we're demonstrating how to update feature values for three flight entities (`OO_3757`, `AA_1200`, and `DL_2851`) using the `store.push()` method. This approach:

1. Creates a dummy DataFrame with the latest flight information
3. Adds the mandatory `event_timestamp` field that Feast needs
4. Pushes the data directly to the online store via our configured push source

Unlike materialization (which loads historical data on a schedule), pushing allows immediate updates whenever new information becomes available. This is perfect for incorporating real-time signals like current weather conditions, airport congestion, or last-minute schedule changes into our delay prediction model.

After pushing, these updated feature values are instantly available for retrieval through `get_online_features()`, ensuring our model makes predictions using the most current information.

In [32]:
# Create dummy flight data for the three specified flights
dummy_flights = [
    {
        "flight_ID": "OO_3757",
        "FlightDate": "2025-09-15",
        "Origin": "SEA",
        "Dest": "KTN",
        "Distance": 680.0,
        "CRSElapsedTime": 140.0,
        "DayOfWeek": 1,
        "Month": 9,
        "Quarter": 3,
        "DepDelay": 5.0,
        "WeatherDelay": 0.0,
        "NASDelay": 2.0,
        "SecurityDelay": 0.0,
        "LateAircraftDelay": 0.0,
        "ArrDelay": 10.0,
        "is_holiday": 0,
        "days_to_nearest_holiday": 17,  # Days to nearest holiday
        "Route": "SEA_KTN",
        "route_avg_delay_24h": 7.5,
        "route_max_delay_24h": 15.0
    },
    {
        "flight_ID": "AA_1200",
        "FlightDate": "2025-09-15",
        "Origin": "MIA",
        "Dest": "PIT",
        "Distance": 1013.0,
        "CRSElapsedTime": 167.0,
        "DayOfWeek": 1,
        "Month": 9,
        "Quarter": 3,
        "DepDelay": 12.0,
        "WeatherDelay": 5.0,
        "NASDelay": 0.0,
        "SecurityDelay": 0.0,
        "LateAircraftDelay": 0.0,
        "ArrDelay": 18.0,
        "is_holiday": 0,
        "days_to_nearest_holiday": 17,  # Days to nearest holiday
        "Route": "MIA_PIT",
        "route_avg_delay_24h": 22.5,
        "route_max_delay_24h": 45.0
    },
    {
        "flight_ID": "DL_2851",
        "FlightDate": "2025-09-15",
        "Origin": "SEA",
        "Dest": "DEN",
        "Distance": 1024.0,
        "CRSElapsedTime": 153.0,
        "DayOfWeek": 1,
        "Month": 9,
        "Quarter": 3,
        "DepDelay": -3.0,  # Negative delay means early departure
        "WeatherDelay": 0.0,
        "NASDelay": 0.0,
        "SecurityDelay": 0.0,
        "LateAircraftDelay": 0.0,
        "ArrDelay": -5.0,
        "is_holiday": 0,
        "days_to_nearest_holiday": 17,  # Days to nearest holiday
        "Route": "SEA_DEN",
        "route_avg_delay_24h": -2.5,
        "route_max_delay_24h": 8.0
    }
]

# Convert to DataFrame
dummy_df = pd.DataFrame(dummy_flights)

# Convert FlightDate to datetime for Feast
dummy_df['FlightDate'] = pd.to_datetime(dummy_df['FlightDate'])
dummy_df['event_timestamp'] = dummy_df['FlightDate']  # Feast requires 'event_timestamp' column

# Push to Feast
print("Pushing dummy flight data to Feast...")
store.push("flight_stats_push_source", dummy_df)
print("Push completed successfully!")



Pushing dummy flight data to Feast...
Push completed successfully!


In [33]:
# Batch source feature service
fetch_online_features(store, source="advanced_feature_service")

ArrDelay  :  [241.0, -7.0, -25.0]
CRSElapsedTime  :  [129.0, 169.0, 154.0]
CarrierDelay  :  [0.0, None, None]
DayOfWeek  :  [7, 7, 7]
DepDelay  :  [259.0, 5.0, -8.0]
Dest  :  ['MSP', 'PIT', 'DEN']
Distance  :  [604.0, 1013.0, 1024.0]
LateAircraftDelay  :  [0.0, None, None]
Month  :  [9, 9, 9]
NASDelay  :  [0.0, None, None]
Origin  :  ['SDF', 'MIA', 'SEA']
Quarter  :  [3, 3, 3]
Route  :  ['SDF_MSP', 'MIA_PIT', 'SEA_DEN']
SecurityDelay  :  [0.0, None, None]
WeatherDelay  :  [241.0, None, None]
days_to_nearest_holiday  :  [15, 15, 15]
flight_ID  :  ['OO_3757', 'AA_1200', 'DL_2851']
is_holiday  :  [0, 0, 0]
route_avg_delay_24h  :  [41.0, -5.0, 12.210526466369629]
route_max_delay_24h  :  [80.0, -5.0, 104.0]


In [34]:
# real time push source feature service
fetch_online_features(store, source="real_time_feature_service")

CRSElapsedTime  :  [140.0, 167.0, 153.0]
DayOfWeek  :  [1, 1, 1]
DepDelay  :  [5.0, 12.0, -3.0]
Dest  :  ['KTN', 'PIT', 'DEN']
Distance  :  [680.0, 1013.0, 1024.0]
Month  :  [9, 9, 9]
NASDelay  :  [2.0, 0.0, 0.0]
Origin  :  ['SEA', 'MIA', 'SEA']
Quarter  :  [3, 3, 3]
Route  :  ['SEA_KTN', 'MIA_PIT', 'SEA_DEN']
WeatherDelay  :  [0.0, 5.0, 0.0]
days_to_nearest_holiday  :  [17, 17, 17]
flight_ID  :  ['OO_3757', 'AA_1200', 'DL_2851']
is_holiday  :  [0, 0, 0]
route_avg_delay_24h  :  [7.5, 22.5, -2.5]
route_max_delay_24h  :  [15.0, 45.0, 8.0]


Notice the difference where the feature service which has push source as its data source serves real time information whereas the other feature service serves historical data ? This is why push sources are important in feature stores

# From Batch Processing to Streaming: Adding Real-Time Capabilities

While Feast's push source provides a mechanism for updating features on-demand, it needs a source for this real time data. This is where streaming platforms like Redpanda (a Kafka-compatible streaming platform) become valuable. In the next section, we'll explore how to:

1. Set up a streaming data pipeline that continuously processes flight information
2. Apply transformations to incoming data to get computed features (adding holiday features and route statistics) 
3. Push the enriched data to our feature store in real-time

This approach enables truly real-time ML predictions by maintaining feature freshness with minimal latency between data generation and availability. Unlike batch-based materialization which happens on a schedule, streaming allows our feature values to reflect the current state of the system at all times, making it ideal for time-sensitive applications like flight delay predictions where conditions change rapidly.

Now, lets bring up a popular streaming platform redpanda using docker-compose. Please execute the below command in SSH to bring redpanda up 

```bash
docker-compose -f /home/cc/feast-artifact/docker/docker-compose-feast.yaml up redpanda console 
```

Here in the below cell, we initialize the core Kafka objects needed for our streaming data pipeline :

- Connection to our Redpanda server (a Kafka-compatible streaming platform)
- A producer client to send messages to the streaming platform
- An admin client to manage topics and configurations

Note : Redpanda is compatible with the Kafka Python library 

In [35]:
servers = 'redpanda:9092'
producer = KafkaProducer(bootstrap_servers=servers)
admin = KafkaAdminClient(bootstrap_servers=servers)

Run the below cell for the following : 

- Creates a new topic called "test_topic" in our streaming platform
- Sets up 3 partitions to allow parallel processing of messages
- Uses a replication factor of 1 (no redundancy, suitable for development)

In [36]:
topic = NewTopic(name='test_topic', num_partitions=3, replication_factor=1)
admin.create_topics([topic])

CreateTopicsResponse_v3(throttle_time_ms=0, topic_errors=[(topic='test_topic', error_code=0, error_message='Success')])

Now lets simulate real-time flight data stream by:

- Reading our historical flight dataset
- Updating the timestamps to make the data appear current
- Publishing each flight record to our Redpanda topic

Each iteration adds 52 weeks (1 year) to flight dates, creating the effect of fresh data flowing through our system. The short sleep between messages prevents overwhelming the consumer and simulates a more realistic data velocity.

Navigate to http://localhost:8080/topics/test_topic in order to check the topic getting events

Interrupt the kernel after sometime to stop the simulation ensuring we have enough data in the redpanda stream

In [38]:
# Function to simulate fresher data by updating timestamps
def update_timestamps(row, iteration):
    # Convert FlightDate to datetime
    flight_date = pd.to_datetime(row["FlightDate"])
    # Add weeks to simulate fresher data
    updated_flight_date = flight_date + pd.Timedelta(weeks=52 * iteration)
    row["FlightDate"] = updated_flight_date.strftime("%Y-%m-%d")
    return row

iteration = 1
while iteration<10:
    for row in df[
        ['flight_ID', 'FlightDate', 'Origin', 'Dest', 'Distance',
       'CRSElapsedTime', 'DayOfWeek', 'Month', 'Quarter', 'DepDelay',
       'CarrierDelay', 'WeatherDelay', 'NASDelay', 'SecurityDelay',
       'LateAircraftDelay', 'ArrDelay']
        ].to_dict("records"):
        # Update timestamps to simulate fresher data
        row = update_timestamps(row, iteration)
        # Send the row to Kafka
        producer.send('test_topic', json.dumps(row).encode())
        sleep(0.2) 
    iteration += 1

KeyboardInterrupt: 

# Setting up Kafka Consumer

This code creates a consumer that will:

- Subscribe to the 'test_topic' we created earlier
- Connect to the same Redpanda server
- Start reading from the earliest available message in the topic
- Automatically deserialize the JSON messages back into Python dictionaries
- Operate as part of the 'flight_feature_processor' consumer group (enabling multiple consumers to work together if needed)

The consumer will pull flight data records from the stream so we can enrich them with additional features before pushing to our feature store.

In [52]:
consumer = KafkaConsumer(
    'test_topic',
    bootstrap_servers=['redpanda:9092'],
    auto_offset_reset='earliest',
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

# Real-Time Stream Processing for Feature Engineering

Our flight delay prediction service requires both historical and real-time features. While our batch data already contains pre-calculated features like route statistics and holiday information, our streaming data arrives in its raw form and needs real-time transformation.

## Stream Processing Architecture

This code implements a lightweight stream processing pipeline that:

1. Consumes raw flight data from our Redpanda topic
2. Applies stateful transformations to enrich the data with:
   - Holiday-related features (is_holiday, days_to_nearest_holiday)
   - Route-based statistics with 24-hour rolling windows (route_avg_delay_24h, route_max_delay_24h)
3. Pushes the enriched features directly to our Feast feature store for immediate availability

For production environments, this simple Python consumer could be replaced with more robust stream processing frameworks like:

- **Apache Flink**: Would provide true stateful stream processing with exactly-once guarantees and advanced windowing operations, ideal for maintaining accurate route statistics over sliding time windows
- **Apache Spark Structured Streaming**: Would enable batch and streaming unification with the same processing logic and integration with ML pipelines

These frameworks would be particularly valuable for scenarios requiring:
- Processing millions of flights in parallel
- Complex windowing across multiple time dimensions
- Integration with existing data pipelines
- Scaling to handle holiday travel surges

Our current implementation maintains state in memory using Python dictionaries, which works for demonstration but lacks the fault tolerance and scalability of dedicated stream processing frameworks.

In [40]:
# 1. Function to add holiday features
def add_holiday_features(flight_data):
    """Add holiday-related features to flight data."""
    # Convert FlightDate to datetime
    flight_date = datetime.strptime(flight_data["FlightDate"], "%Y-%m-%d")
    
    # Create US holiday calendar
    cal = USFederalHolidayCalendar()
    # Get holidays for the year of flight
    year = flight_date.year
    holidays = cal.holidays(start=f'{year}-01-01', end=f'{year}-12-31')
    
    # Check if flight date is a holiday
    flight_data["is_holiday"] = 1 if flight_date.date() in holidays.date else 0
    
    # Calculate days to nearest holiday
    days_diff = [abs((flight_date.date() - holiday).days) for holiday in holidays.date]
    flight_data["days_to_nearest_holiday"] = min(days_diff) if days_diff else 0
    
    return flight_data

In [41]:
# 2. Initialize route delay storage for window calculations
route_delays = {}  # Dictionary to track delays by route

In [42]:
def update_route_statistics(flight_data):
    """Calculate rolling window statistics for routes."""
    # Create route key
    origin = flight_data["Origin"]
    dest = flight_data["Dest"]
    route = f"{origin}_{dest}"
    flight_data["Route"] = route
    
    # Get flight date as datetime
    flight_date = datetime.strptime(flight_data["FlightDate"], "%Y-%m-%d")
    
    # Initialize route data structure if needed
    if route not in route_delays:
        route_delays[route] = []
    
    # Get departure delay (handle None values)
    dep_delay = flight_data.get("DepDelay")
    if dep_delay is not None:
        dep_delay = float(dep_delay)
    else:
        dep_delay = 0.0
    
    # Add current delay to history
    route_delays[route].append((flight_date, dep_delay))
    
    # Keep only delays within 24 hour window
    cutoff_time = flight_date - timedelta(hours=24)
    route_delays[route] = [(ts, delay) for ts, delay in route_delays[route] 
                          if ts >= cutoff_time]
    
    # Calculate statistics if we have data
    if route_delays[route]:
        delays = [delay for _, delay in route_delays[route]]
        flight_data["route_avg_delay_24h"] = sum(delays) / len(delays)
        flight_data["route_max_delay_24h"] = max(delays)
    else:
        flight_data["route_avg_delay_24h"] = 0
        flight_data["route_max_delay_24h"] = 0
    
    return flight_data

In [43]:
def push_to_feast(flight_data, store):
    """Convert flight data to DataFrame and push to Feast."""
    # Create a DataFrame with a single row
    df = pd.DataFrame([flight_data])
    
    # Convert FlightDate to datetime for Feast
    df['FlightDate'] = pd.to_datetime(df['FlightDate'])
    df['event_timestamp'] = df['FlightDate']
    
    try:
        # Push to Feast
        store.push("flight_stats_push_source", df)
        print(f"Successfully pushed data for flight {flight_data['flight_ID']}")
        return True
    except Exception as e:
        print(f"Error pushing to Feast: {e}")
        return False

In [53]:
print("Starting Kafka consumer. Processing flight data and pushing to Feast...")
try:
    # Set a timeout for polling messages
    message_count = 0
    max_empty_polls = 3
    empty_poll_count = 0
    
    # Continue polling until we reach max_empty_polls consecutive empty polls
    while empty_poll_count < max_empty_polls:
        # Poll with a timeout (e.g., 5 seconds)
        messages = consumer.poll(timeout_ms=5)
        
        if not messages:
            print(f"No messages received. Empty poll count: {empty_poll_count + 1}/{max_empty_polls}")
            empty_poll_count += 1
            continue
        
        # Reset empty poll count if we got messages
        empty_poll_count = 0
        
        # Process all messages received in this poll
        for topic_partition, partition_messages in messages.items():
            for message in partition_messages:
                # Get the flight data from Kafka
                flight_data = message.value
                
                # 1. Add holiday features
                flight_data = add_holiday_features(flight_data)
                
                # 2. Add route statistics
                flight_data = update_route_statistics(flight_data)
                
                # 3. Push to Feast
                store = FeatureStore(repo_path=".")
                push_to_feast(flight_data, store)
                
                # Count and print update
                message_count += 1
                print(f"Processed flight {flight_data['flight_ID']} on route {flight_data['Route']} ({message_count} total)")
    
    print(f"Finished processing {message_count} messages after {max_empty_polls} empty polls")
    
except KeyboardInterrupt:
    print("Consumer stopped by user.")
finally:
    consumer.close()
    print("Kafka consumer closed.")

Starting Kafka consumer. Processing flight data and pushing to Feast...
Successfully pushed data for flight AS_502
Processed flight AS_502 on route SEA_AUS (1 total)
Successfully pushed data for flight AS_511
Processed flight AS_511 on route STL_SEA (2 total)
Successfully pushed data for flight AS_514
Processed flight AS_514 on route RNO_SEA (3 total)
Successfully pushed data for flight AS_515
Processed flight AS_515 on route SEA_STL (4 total)
Successfully pushed data for flight AS_499
Processed flight AS_499 on route IAD_SEA (5 total)
Successfully pushed data for flight AS_522
Processed flight AS_522 on route SEA_BNA (6 total)
Successfully pushed data for flight AS_531
Processed flight AS_531 on route AUS_SAN (7 total)
Successfully pushed data for flight AS_538
Processed flight AS_538 on route SEA_ORD (8 total)
Successfully pushed data for flight AS_517
Processed flight AS_517 on route FLL_SEA (9 total)
Successfully pushed data for flight AS_494
Processed flight AS_494 on route SEA_SA

# Airflow Integration
To ensure fresh features, you'll want to schedule materialization jobs regularly. This can be as simple as having a cron job that calls feast materialize-incremental.

Airflow can be an important tool in scheduling this. Let's bring up airflow by executing the following command in SSH:
```bash
docker-compose -f /home/cc/feast-artifact/docker/docker-compose-airflow.yml up -d
```

In a browser, open `http://localhost:8081`. Log in with username airflow@example.com and password airflow (we have created an initial user with these credentials in our Docker compose file).

We have already seen how DAG can be used to automate workloops in the Feedback loop tutorial. In the same way, we have created a DAG that automates materialization using `dag.py`

```python
import os
from airflow.decorators import dag, task
from datetime import datetime, timedelta
from feast import RepoConfig, FeatureStore
import pendulum
from pathlib import Path


@dag(
    schedule="@hourly",  # Adjust the schedule as needed
    catchup=False,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    tags=["feast"],
)
def materialize_dag():
    @task()
    def materialize():
        repo_path = '/opt/airflow/feature_repo/'
        
        # Print current working directory for debugging
        print(f"Current working directory: {os.getcwd()}")
        print(f"Using feature repository at: {repo_path}")
        
        # Use FeatureStore with explicit repo_path
        store = FeatureStore(repo_path=repo_path)
        
        # Calculate start and end time for materialization
        end_time = datetime.utcnow()
        start_time = end_time - timedelta(hours=1)
        
        print(f"Materializing data from {start_time} to {end_time}")
        
        # Materialize the previous hour of data
        store.materialize(start_time, end_time)

    materialize()


# This line is required for Airflow to properly register the DAG
materialize_dag = materialize_dag()
```

Let's break this down :

1. The `@dag` decorator defines basic DAG properties:
   - `schedule="@hourly"` - Runs every hour
   - `catchup=False` - Doesn't run for missed intervals
   - `start_date` - When the DAG becomes active
   - `tags=["feast"]` - For organization in the UI

2. Inside the DAG, there's a single `@task` called `materialize()` that:
   - Sets the path to your feature repository
   - Creates a FeatureStore instance
   - Calculates a time window (the previous hour)
   - Calls the `materialize()` method to update offline feature values
