In [69]:
%%capture
import warnings
import os

warnings.filterwarnings("ignore")
os.environ["PYTHONWARNINGS"] = "ignore"

In [70]:
!pip install feast scikit-learn 'feast[gcp]'



In [71]:
!feast version

Feast SDK Version: "0.49.0"


In [72]:
!git clone https://github.com/adhithyasash1/week-3-iris-feast.git

fatal: destination path 'week-3-iris-feast' already exists and is not an empty directory.


In [73]:
PROJECT_ID= "true-sprite-459511-f5" #@param {type:"string"}
BUCKET_NAME= "true-sprite-459511-f5-iris-pipeline" #@param {type:"string"} custom
BIGQUERY_DATASET_NAME="iris_feast_pipeline" #@param {type:"string"} custom
AI_PLATFORM_MODEL_NAME = "iris_feast_pipeline_model"

!gcloud config set project $PROJECT_ID
%env GOOGLE_CLOUD_PROJECT=$PROJECT_ID
!echo project_id = $PROJECT_ID > ~/.bigqueryrc

Updated property [core/project].
env: GOOGLE_CLOUD_PROJECT=true-sprite-459511-f5


In [74]:
!gsutil mb gs://$BUCKET_NAME

Creating gs://true-sprite-459511-f5-iris-pipeline/...
ServiceException: 409 A Cloud Storage bucket named 'true-sprite-459511-f5-iris-pipeline' already exists. Try another name. Bucket names must be globally unique across all Google Cloud projects, including those outside of your organization.


In [75]:
!bq mk --dataset true-sprite-459511-f5:iris_feast_pipeline

BigQuery error in mk operation: Dataset 'true-
sprite-459511-f5:iris_feast_pipeline' already exists.


In [76]:
from datetime import datetime, timedelta

# Load base Iris dataset
try:
    df = pd.read_csv("week-3-iris-feast/data/iris.csv")
except FileNotFoundError:
    print("Could not find initial iris.csv, please ensure it's in the 'data' directory.")
    exit()

# 1. Add a unique flower_id for EACH ROW using the DataFrame's index.
df['flower_id'] = df.index

# 2. Add realistic, staggered event_timestamps.
base_timestamp = datetime.now()
# Create a series of timestamps, each one minute apart.
df["event_timestamp"] = [base_timestamp - timedelta(minutes=i) for i in range(len(df))]
# Reverse the order so the first row has the earliest timestamp.
df = df.iloc[::-1].reset_index(drop=True)

# Reorder columns for clarity.
df = df[['event_timestamp', 'flower_id', 'sepal_length', 'sepal_width', 'petal_length', 'petal_width', 'species']]

# Save the processed file, overwriting the old one.
df.to_csv("week-3-iris-feast/data/iris.csv", index=False)

print(f"Modified iris.csv with {len(df)} unique flower_ids and staggered timestamps.")
print("\nNew data head:")
print(df.head())
print(f"\nNumber of unique flower_ids: {df['flower_id'].nunique()}")

Modified iris.csv with 150 unique flower_ids and staggered timestamps.

New data head:
             event_timestamp  flower_id  sepal_length  sepal_width  \
0 2025-06-20 08:52:52.795099        149           5.9          3.0   
1 2025-06-20 08:53:52.795099        148           6.2          3.4   
2 2025-06-20 08:54:52.795099        147           6.5          3.0   
3 2025-06-20 08:55:52.795099        146           6.3          2.5   
4 2025-06-20 08:56:52.795099        145           6.7          3.0   

   petal_length  petal_width    species  
0           5.1          1.8  virginica  
1           5.4          2.3  virginica  
2           5.2          2.0  virginica  
3           5.0          1.9  virginica  
4           5.2          2.3  virginica  

Number of unique flower_ids: 150


In [77]:
from google.cloud import bigquery

client = bigquery.Client()

job_config = bigquery.LoadJobConfig(
    source_format=bigquery.SourceFormat.CSV,
    skip_leading_rows=1,
    autodetect=True,
    write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
)

with open("week-3-iris-feast/data/iris.csv", "rb") as source_file:
    job = client.load_table_from_file(
        source_file,
        destination="true-sprite-459511-f5.iris_feast_pipeline.iris_table",
        job_config=job_config
    )

job.result()  # Wait for the job to complete
print("Iris CSV successfully uploaded in BigQuery.")

Iris CSV successfully uploaded in BigQuery.


In [78]:
!rm -rf .ipynb_checkpoints

In [79]:
!find . -type d -name ".ipynb_checkpoints" -exec rm -r {} +

In [80]:
with open("week-3-iris-feast/.feastignore", "w") as f:
    f.write(".ipynb_checkpoints/\n*.ipynb\n")

In [81]:
!cat week-3-iris-feast/.feastignore

.ipynb_checkpoints/
*.ipynb


In [82]:
!feast -c week-3-iris-feast/iris_pipeline apply

  flower = Entity(
No project found in the repository. Using project name iris_pipeline defined in feature_store.yaml
Applying changes for project iris_pipeline
Deploying infrastructure for [1m[32miris_features[0m


In [83]:
from feast import FeatureStore

store = FeatureStore(repo_path="week-3-iris-feast/iris_pipeline")
print(store.list_feature_views())

[<FeatureView(name = iris_features, entities = ['flower_id'], ttl = 364 days, 0:00:00, stream_source = None, batch_source = {
  "type": "BATCH_BIGQUERY",
  "timestampField": "event_timestamp",
  "bigqueryOptions": {
    "table": "true-sprite-459511-f5.iris_feast_pipeline.iris_table"
  },
  "name": "true-sprite-459511-f5.iris_feast_pipeline.iris_table"
}, entity_columns = [Field(
    name='flower_id',
    dtype=<PrimitiveFeastType.INT64: 4>,
    description='',
    tags={}
    vector_index=False
    vector_length=0
    vector_search_metric=''
)], features = [Field(
    name='sepal_length',
    dtype=<PrimitiveFeastType.FLOAT32: 6>,
    description='',
    tags={}
    vector_index=False
    vector_length=0
    vector_search_metric=''
), Field(
    name='sepal_width',
    dtype=<PrimitiveFeastType.FLOAT32: 6>,
    description='',
    tags={}
    vector_index=False
    vector_length=0
    vector_search_metric=''
), Field(
    name='petal_length',
    dtype=<PrimitiveFeastType.FLOAT32: 6>,


In [84]:
!feast -c week-3-iris-feast/iris_pipeline plan

  flower = Entity(
No project found in the repository. Using project name iris_pipeline defined in feature_store.yaml
[1m[94mNo changes to registry
Created sqlite table [1m[32miris_pipeline_iris_features[0m



In [85]:
!ls -l week-3-iris-feast/iris_pipeline

total 12
drwxr-xr-x 2 jupyter jupyter 4096 Jun 15 18:02 data
-rw-r--r-- 1 jupyter jupyter  101 Jun 15 17:55 feature_store.yaml
-rw-r--r-- 1 jupyter jupyter 1185 Jun 20 10:26 iris_repo.py


In [86]:
!ls -l week-3-iris-feast/iris_pipeline/data

total 4
-rw-r--r-- 1 jupyter jupyter   0 Jun 15 18:02 online.db
-rw-r--r-- 1 jupyter jupyter 898 Jun 20 11:22 registry.db


In [87]:
!pip install "numpy<2" "pandas==2.2.2"



In [88]:
import pandas as pd
import feast
from joblib import dump
from sklearn.linear_model import LogisticRegression

# Load the raw data to get the correct timestamps and entity IDs
try:
    df = pd.read_csv("week-3-iris-feast/data/iris.csv", parse_dates=["event_timestamp"])
except FileNotFoundError:
    print("Error: 'week-3-iris-feast/data/iris.csv' not found.")
    df = pd.DataFrame()

print("----------- DATA DIAGNOSTICS -----------")
print(f"Total rows loaded from CSV: {len(df)}")
print(f"Number of unique flower_id's in CSV: {df['flower_id'].nunique()}") # <-- This should now be 150
print("----------------------------------------")

if not df.empty:
    # Notice we now use the corrected entity dataframe with 150 unique IDs
    entity_df = df[["event_timestamp", "flower_id"]].copy()

    # Connecting to feature store
    fs = feast.FeatureStore(repo_path="week-3-iris-feast/iris_pipeline")

    # Pulling in historical features
    training = fs.get_historical_features(
        entity_df=entity_df,
        features=[
            "iris_features:sepal_length",
            "iris_features:sepal_width",
            "iris_features:petal_length",
            "iris_features:petal_width",
            "iris_features:species",
        ]
    ).to_df()

    training = training.rename(columns={"species": "label"})

    print("\n----- Data after feature retrieval and rename -----\n")
    print(training.head())
    print("\n----- Feature schema -----\n")
    training.info()

    # Drop any rows with missing values
    training.dropna(subset=["sepal_length","sepal_width","petal_length","petal_width","label"], inplace=True)
    
    # Since every flower_id is now unique, this drop_duplicates line will have no effect,
    # leaving you with all 150 rows.
    training = (
        training
        .sort_values(by="event_timestamp", ascending=False)
        .drop_duplicates(subset="flower_id", keep="first")
    )


    # Training a classifier
    X = training[["sepal_length","sepal_width","petal_length","petal_width"]]
    y = training["label"]

    if not X.empty:
        clf = LogisticRegression(max_iter=200)
        clf.fit(X, y)

        # Saving model
        dump(clf, "week-3-iris-feast/iris_feast_model.joblib")
        # You should see 150 examples here!
        print(f"\nModel trained on {len(X)} examples and saved as iris_feast_model.joblib")
    else:
        print("\nCould not train model as no training data was available after processing.")

----------- DATA DIAGNOSTICS -----------
Total rows loaded from CSV: 150
Number of unique flower_id's in CSV: 150
----------------------------------------

----- Data after feature retrieval and rename -----

                   event_timestamp  flower_id  sepal_length  sepal_width  \
0 2025-06-20 09:57:52.795099+00:00         84           5.4          3.0   
1 2025-06-20 10:00:52.795099+00:00         81           5.5          2.4   
2 2025-06-20 11:13:52.795099+00:00          8           4.4          2.9   
3 2025-06-20 09:18:52.795099+00:00        123           6.3          2.7   
4 2025-06-20 09:27:52.795099+00:00        114           5.8          2.8   

   petal_length  petal_width       label  
0           4.5          1.5  versicolor  
1           3.7          1.0  versicolor  
2           1.4          0.2      setosa  
3           4.9          1.8   virginica  
4           5.1          2.4   virginica  

----- Feature schema -----

<class 'pandas.core.frame.DataFrame'>
RangeInde

In [101]:
!rm -rf .ipynb_checkpoints

In [102]:
!find . -type d -name ".ipynb_checkpoints" -exec rm -r {} +

In [103]:
with open("week-3-iris-feast/.feastignore", "w") as f:
    f.write(".ipynb_checkpoints/\n*.ipynb\n")

In [104]:
!cat week-3-iris-feast/.feastignore

.ipynb_checkpoints/
*.ipynb


In [105]:
!feast -c week-3-iris-feast/iris_pipeline apply

  flower = Entity(
No project found in the repository. Using project name iris_pipeline defined in feature_store.yaml
Applying changes for project iris_pipeline
Deploying infrastructure for [1m[32miris_features[0m


In [106]:
!cd week-3-iris-feast/iris_pipeline && feast materialize 2025-06-20T08:52:52.795099 2025-06-20T11:21:52.795099+00:00

Materializing [1m[32m1[0m feature views from [1m[32m2025-06-20 08:52:52+00:00[0m to [1m[32m2025-06-20 11:21:52+00:00[0m into the [1m[32msqlite[0m online store.

[1m[32miris_features[0m:
100%|███████████████████████████████████████████████████████████| 150/150 [00:00<00:00, 5827.53it/s]


In [97]:
'''
import pandas as pd
df = pd.read_csv("week-3-iris-feast/data/iris.csv", parse_dates=["event_timestamp"])
print("min:", df.event_timestamp.min())
print("max:", df.event_timestamp.max())
'''

min: 2025-06-20 08:52:52.795099
max: 2025-06-20 11:21:52.795099


In [116]:
from feast import FeatureStore
fs = FeatureStore(repo_path="week-3-iris-feast/iris_pipeline")

# Try flower_id=0 and 1 for example
online_df = fs.get_online_features(
  features=[
    "iris_features:sepal_length",
    "iris_features:sepal_width",
    "iris_features:petal_length",
    "iris_features:petal_width",
  ],
  entity_rows=[{"flower_id": 0}, {"flower_id": 1}]
).to_df()

print(online_df)

   flower_id  petal_length  sepal_length  sepal_width  petal_width
0          0           1.4           5.1          3.5          0.2
1          1           1.4           4.9          3.0          0.2
flower_id       0
petal_length    0
sepal_length    0
sepal_width     0
petal_width     0
dtype: int64


In [117]:
import feast
from joblib import load
import pandas as pd

class IrisInferenceService:
    def __init__(self,
                 repo_path: str = "week-3-iris-feast/iris_pipeline",
                 model_path: str = "week-3-iris-feast/iris_feast_model.joblib"):
        # Loading Feast FeatureStore
        self.fs = feast.FeatureStore(repo_path=repo_path)
        # Loading the trained model
        self.model = load(model_path)

    def predict(self, flower_ids: list[int]) -> pd.DataFrame:
        # Building entity rows for lookup
        entity_rows = [{"flower_id": fid} for fid in flower_ids]

        # Fetching features online
        online_feats = self.fs.get_online_features(
            features=[
                "iris_features:sepal_length",
                "iris_features:sepal_width",
                "iris_features:petal_length",
                "iris_features:petal_width",
            ],
            entity_rows=entity_rows
        ).to_df()

        # Preparing input for model
        X = online_feats[[
            "sepal_length",
            "sepal_width",
            "petal_length",
            "petal_width",
        ]]

        # Predicting and attaching results
        online_feats["prediction"] = self.model.predict(X)
        return online_feats[["flower_id", "prediction"]]

if __name__ == "__main__":
    # Quick smoke-test
    service = IrisInferenceService()
    result = service.predict([1, 2, 53, 101, 149])
    print(result)

   flower_id  prediction
0          1      setosa
1          2      setosa
2         53  versicolor
3        101   virginica
4        149   virginica
