### Install and Upgrade Required Python Packages

This notebook requires the following Python packages:

| Package                     | Purpose                                           |
|------------------------------|-------------------------------------------------|
| `google-cloud-aiplatform`    | Interact with Google Vertex AI                  |
| `joblib`                     | Save and load machine learning models          |
| `pandas`                     | Data manipulation and analysis                  |
| `scikit-learn`               | Machine learning algorithms and utilities      |

In [1]:
!pip install --upgrade --quiet google-cloud-aiplatform joblib pandas scikit-learn

### GCP Configuration: Project, Region and Bucket



In [2]:
PROJECT_ID = "geometric-gamma-472903-q2"  # @param {type:"string"}
LOCATION = "us-central1"  # @param {type:"string"}
BUCKET_URI = f"gs://mlops-iitm-geometric-gamma-472903-q2-v4-unique"  # @param {type:"string"}

### Initialize Vertex AI SDK for Python

In [3]:
from google.cloud import aiplatform
aiplatform.init(project=PROJECT_ID, location=LOCATION, staging_bucket=BUCKET_URI)

### Required Libraries and Module Imports

The following libraries and modules are imported for:

- Data manipulation and preprocessing (`pandas`, `OrdinalEncoder`)
- Machine learning model training and evaluation (`scikit-learn`, `joblib`)
- Feature store integration (`feast`)
- Cloud interaction (`google-cloud-storage`, `aiplatform`)
- Date and file handling (`datetime`, `os`)

In [4]:
# -------------------------
# Standard Library Imports
# -------------------------
import os
import requests
from datetime import datetime, timedelta

# -------------------------
# Data Manipulation & Preprocessing
# -------------------------
import pandas as pd
from sklearn.preprocessing import OrdinalEncoder

# -------------------------
# Machine Learning
# -------------------------
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import accuracy_score
import joblib

# -------------------------
# Cloud & AI Platform Integration
# -------------------------
from google.cloud import storage, aiplatform

# -------------------------
# Feature Store (Feast)
# -------------------------
from feast import FeatureStore, Entity, FeatureView, FileSource, Field, ValueType
from feast.types import Float32, Int64

### Initialize Feast Feature 

In [5]:
!cd ~/week-2/DVC-WEEK2-IITM/week-3 && feast init iris_feature_feast_repo


Creating a new Feast repository in [1m[32m/home/jupyter/week-2/DVC-WEEK2-IITM/week-3/iris_feature_feast_repo[0m.



### Upload GitHub Data to GCS Bucket and local storage

In [6]:
# Initialize GCS client
client = storage.Client(project=PROJECT_ID)
bucket_name = BUCKET_URI.split("gs://")[1]
bucket = client.bucket(bucket_name)

# Files in GitHub 'data' folder with folder structure
github_files = {
    "iris_data_adapted_for_feast.csv": "https://raw.githubusercontent.com/IITMBSMLOps/ga_resources/week_3/iris_data_adapted_for_feast.csv"
}

# Local folder to save files
local_base_dir = "week-2/DVC-WEEK2-IITM/week-3/data"

# Upload to GCS under assignments/week-1/
for relative_path, url in github_files.items():
    # Download file from GitHub
    response = requests.get(url)
    response.raise_for_status()
    
    # Save locally
    local_path = os.path.join(local_base_dir, relative_path)
    os.makedirs(os.path.dirname(local_path), exist_ok=True)
    with open(local_path, "wb") as f:
        f.write(response.content)
    print(f"Saved locally: {local_path}")
    
#     # Set destination path in GCS
#     gcs_path = f"Graded-Assignments/Week-3/data/{relative_path}"  
#     blob = bucket.blob(gcs_path)
    
#     # Upload to GCS
#     blob.upload_from_string(response.content)
#     print(f"Uploaded {relative_path} → gs://{bucket_name}/{gcs_path}")

Saved locally: week-2/DVC-WEEK2-IITM/week-3/data/iris_data_adapted_for_feast.csv


### Configure Resource Names


Set names and paths for storing model artifacts and datasets.

| Parameter              | Description                                                                 | Example / Value                                         |
|------------------------|-----------------------------------------------------------------------------|--------------------------------------------------------|
| `DATA_DIR`             | Local folder path to store dataset files                                    | `"week-2/DVC-WEEK2-IITM/week-3/data"`                |
| `MODEL_DIR`            | Local folder path to store trained model artifacts and metrics              | `"week-2/DVC-WEEK2-IITM/week-3/model"`               |
| `CSV_PATH`             | Path to the adapted Iris CSV dataset                                        | `f"{DATA_DIR}/iris_data_adapted_for_feast.csv"`      |
| `PARQUET_PATH`         | Path to store dataset in Parquet format for Feast                            | `f"{DATA_DIR}/data_feast.parquet"`                   |
| `MODEL_PATH`           | Path to save the trained ML model                                           | `f"{MODEL_DIR}/model.joblib"`                        |
| `METRICS_PATH`         | Path to save evaluation metrics                                             | `f"{MODEL_DIR}/metrics.txt"`                          |

In [7]:
DATA_DIR = "week-2/DVC-WEEK2-IITM/week-3/data"
MODEL_DIR = "week-2/DVC-WEEK2-IITM/week-3/model"
os.makedirs(DATA_DIR, exist_ok=True)
os.makedirs(MODEL_DIR, exist_ok=True)

CSV_PATH = f"{DATA_DIR}/iris_data_adapted_for_feast.csv"
PARQUET_PATH = f"{DATA_DIR}/data_feast.parquet"
MODEL_PATH = f"{MODEL_DIR}/model.joblib"
METRICS_PATH = f"{MODEL_DIR}/metrics.txt"

### Data Preprocessing and Parquet Conversion

This section prepares the Iris dataset for feature store ingestion and model training:

- Reads the adapted CSV dataset from `CSV_PATH`.
- Converts the `iris_id` column to integer type for entity identification.
- Converts `event_timestamp` to datetime format.
- Encodes the categorical `species` column into integer labels using `OrdinalEncoder`.
- Saves the processed dataset as a Parquet file at `PARQUET_PATH` for efficient downstream usage.

In [8]:
encoder = OrdinalEncoder()
df = pd.read_csv(CSV_PATH)

df["iris_id"] = df["iris_id"].astype("int64")             # Ensure integer type for entity
df["event_timestamp"] = pd.to_datetime(df["event_timestamp"])  # Timestamp column
df["species"] = encoder.fit_transform(df[["species"]]).astype(int)  # Convert labels to int

df.to_parquet(PARQUET_PATH, index=False)           # Save as Parquet


### Explore and Analyze Preprocessed Dataset

This section inspects the Parquet dataset to verify data types, check for missing values, and perform basic statistical and categorical analysis.


In [9]:
# Load the Parquet file
df_parquet = pd.read_parquet(PARQUET_PATH)

In [10]:
# Display first few rows
print("First 5 rows:")
display(df_parquet.head())

First 5 rows:


Unnamed: 0,event_timestamp,iris_id,sepal_length,sepal_width,petal_length,petal_width,species,created_timestamp
0,2025-09-17 10:40:17.102131,1001,5.52,2.53,3.86,1.13,1,2025-10-02 10:40:17.172178
1,2025-09-18 10:40:17.102131,1001,5.5,2.24,3.6,1.08,1,2025-10-02 10:40:17.172178
2,2025-09-19 10:40:17.102131,1001,5.55,2.47,3.75,1.08,1,2025-10-02 10:40:17.172178
3,2025-09-20 10:40:17.102131,1001,5.45,2.37,3.92,1.2,1,2025-10-02 10:40:17.172178
4,2025-09-21 10:40:17.102131,1001,5.65,2.52,3.95,1.17,1,2025-10-02 10:40:17.172178


In [11]:
# Check data types
print("\nData types:")
print(df_parquet.dtypes)


Data types:
event_timestamp      datetime64[ns]
iris_id                       int64
sepal_length                float64
sepal_width                 float64
petal_length                float64
petal_width                 float64
species                       int64
created_timestamp            object
dtype: object


In [12]:
# Summary statistics for numerical columns
print("\nSummary statistics:")
display(df_parquet.describe())


Summary statistics:


Unnamed: 0,event_timestamp,iris_id,sepal_length,sepal_width,petal_length,petal_width,species
count,45,45.0,45.0,45.0,45.0,45.0,45.0
mean,2025-09-24 10:40:17.102130688,1002.0,5.155556,2.956889,2.218,0.518222,0.333333
min,2025-09-17 10:40:17.102131,1001.0,4.69,2.19,1.19,-0.13,0.0
25%,2025-09-20 10:40:17.102130944,1001.0,4.9,2.48,1.39,0.2,0.0
50%,2025-09-24 10:40:17.102130944,1002.0,5.1,3.01,1.51,0.31,0.0
75%,2025-09-28 10:40:17.102130944,1003.0,5.45,3.42,3.77,1.06,1.0
max,2025-10-01 10:40:17.102131,1003.0,5.65,3.67,3.95,1.31,1.0
std,,0.825723,0.285367,0.456423,1.155581,0.437661,0.476731


### Define Entity for Feature Store

In Feast, an **Entity** represents a primary object or “thing” in the data for which features are recorded.  
Here, we define an entity for the Iris dataset.


In [13]:
iris_entity = Entity(
    name="iris_id",
    join_keys=["iris_id"],
    value_type=ValueType.INT64,
    description="Unique identifier for each iris plant"
)

### Define Feature Data Source

This block defines the **data source** for Feast.  

- `FileSource` tells Feast where to find the feature data.  
- `path` points to the Parquet file containing the preprocessed dataset.  
- `event_timestamp_column` specifies the timestamp column used for point-in-time correctness during feature retrieval.

In [14]:
iris_source = FileSource(
    path='/home/jupyter/week-2/DVC-WEEK2-IITM/week-3/data/data_feast.parquet',
    event_timestamp_column="event_timestamp"
)

### Define FeatureView for Iris Dataset

A **FeatureView** in Feast defines a set of features associated with one or more entities.  
It specifies **what features to store, their types, and how long they are valid**.

- `name` : `"iris_features"` – the name of this FeatureView in Feast.  
- `entities` : `[iris_entity]` – the entity/entities these features belong to (`iris_id`).  
- `ttl` : `timedelta(days=1)` – time-to-live for feature values in the online store.  
- `schema` : List of `Field` objects defining the feature names and their data types:  
  - `sepal_length`, `sepal_width`, `petal_length`, `petal_width` → `Float32`  
  - `species` → `Int64`  
- `source` : `iris_source` – the FileSource where the feature data is read from.  
- `online` : `True` – indicates that features will be available for online (real-time) serving.

**Purpose:**  
This FeatureView enables Feast to **ingest, store, and serve these features** for training ML models or online prediction, while keeping track of timestamps and entity associations.


In [15]:
iris_fv = FeatureView(
    name="iris_features",
    entities=[iris_entity],
    ttl=timedelta(days=1),
    schema=[
        Field(name="sepal_length", dtype=Float32),
        Field(name="sepal_width", dtype=Float32),
        Field(name="petal_length", dtype=Float32),
        Field(name="petal_width", dtype=Float32),
        Field(name="species", dtype=Int64)
    ],
    source=iris_source,
    online=True
)

### Initialize and Apply FeatureStore

In Feast, the **FeatureStore** is the main interface to manage entities, feature views, and feature data.  

- `repo_path` : Path to the local feature repository where Feast metadata is stored.  
- `fs.apply([...])` : Registers the defined **entities** and **feature views** with the FeatureStore, making them available for ingestion, storage, and retrieval.

**Purpose:**  
- This step finalizes the Feast setup, ensuring that the **`iris_entity`** and **`iris_fv`** FeatureView are active in the FeatureStore.  
- Once applied, features can be ingested and later served for **training ML models** or **online predictions**.


In [24]:
fs = FeatureStore(repo_path="/home/jupyter/week-2/DVC-WEEK2-IITM/week-3/iris_feature_feast_repo/feature_repo")
fs.apply([iris_entity, iris_fv])

### Materialize Feature Data into the Feature Store

The `materialize` function loads feature data from the **offline source** into the **online feature store** for serving.  

- `start_date` : The beginning of the time range for which feature values should be materialized.  
- `end_date` : The end of the time range.  

**How it works:**  
- Feast reads the features from the `FileSource` (Parquet file) for all entities (`iris_id`) within the given timestamp range.  
- The data is then stored in the **online store**, making it available for **real-time retrieval** during model inference or evaluation.

**Purpose:**  
- Ensures that features are up-to-date in the FeatureStore and ready for **training or online prediction**.


In [25]:
fs.materialize(
    start_date=df["event_timestamp"].min(),
    end_date=df["event_timestamp"].max()
)

Materializing [1m[32m1[0m feature views from [1m[32m2025-09-17 10:40:17+00:00[0m to [1m[32m2025-10-01 10:40:17+00:00[0m into the [1m[32msqlite[0m online store.

[1m[32miris_features[0m:


### Preview Dataset for FeatureStore Entities

Before ingesting data into Feast, it is often useful to **inspect the entity dataframe**:  

- `entity_df = df.copy()` : Creates a copy of the preprocessed dataset to avoid modifying the original `df`.  
- `entity_df.head()` : Displays the first few rows to verify that the columns, data types, and values are correct for Feast ingestion.  

**Purpose:**  
- Confirms that the entity column (`iris_id`), timestamp (`event_timestamp`), and features (`sepal_length`, `sepal_width`, etc.) are properly formatted.  
- Helps catch any issues early before materializing features into the FeatureStore.


In [26]:
entity_df = df.copy() 
entity_df.head()

Unnamed: 0,event_timestamp,iris_id,sepal_length,sepal_width,petal_length,petal_width,species,created_timestamp
0,2025-09-17 10:40:17.102131,1001,5.52,2.53,3.86,1.13,1,2025-10-02 10:40:17.172178
1,2025-09-18 10:40:17.102131,1001,5.5,2.24,3.6,1.08,1,2025-10-02 10:40:17.172178
2,2025-09-19 10:40:17.102131,1001,5.55,2.47,3.75,1.08,1,2025-10-02 10:40:17.172178
3,2025-09-20 10:40:17.102131,1001,5.45,2.37,3.92,1.2,1,2025-10-02 10:40:17.172178
4,2025-09-21 10:40:17.102131,1001,5.65,2.52,3.95,1.17,1,2025-10-02 10:40:17.172178


### Retrieve Historical Features for Training

This step fetches **historical feature values** from Feast to prepare the dataset for model training.

- `fs.get_historical_features(...)` : Retrieves feature values for the entities in `entity_df` using the defined FeatureView(s).  
- `entity_df` : The dataframe containing the entity column (`iris_id`) and timestamp (`event_timestamp`) used to join features.  
- `features` : List of features to fetch from Feast. Each feature is specified as `"FeatureViewName:feature_name"`.  
- `.to_df()` : Converts the returned Feast dataset into a Pandas DataFrame suitable for training.

**Purpose:**  
- Ensures that the **training dataset includes features from the FeatureStore** with proper point-in-time correctness.  
- The resulting `training_data` dataframe is ready for **machine learning model training**.


In [27]:
training_data = fs.get_historical_features(
    entity_df=entity_df,
    features=[
        "iris_features:sepal_length",
        "iris_features:sepal_width",
        "iris_features:petal_length",
        "iris_features:petal_width",
        "iris_features:species"
    ]
).to_df()

### Train and Evaluate Decision Tree Classifier

This section trains a **Decision Tree Classifier** on the historical features retrieved from the Feast FeatureStore and evaluates its performance.

**Steps:**

1. **Prepare Features and Target**
   - `X` contains the feature columns: `sepal_length`, `sepal_width`, `petal_length`, `petal_width`.
   - `y` is the target variable: `species`.

2. **Split the Dataset**
   - Use `train_test_split` to divide data into training (60%) and test (40%) sets.
   - `stratify=y` ensures class proportions are maintained in both sets.
   - `random_state=42` ensures reproducibility.

3. **Train the Model**
   - `DecisionTreeClassifier` is initialized with `max_depth=3` for simplicity and interpretability.
   - Fit the model on the training set (`X_train`, `y_train`).

4. **Evaluate Model**
   - Predict on the test set (`X_test`) and compute accuracy using `accuracy_score`.
   - Print the training accuracy.

5. **Save Model and Metrics**
   - Save the trained model to `MODEL_PATH` using `joblib.dump`.
   - Save the evaluation metrics (accuracy) to `METRICS_PATH`.

**Purpose:**  
- Train a simple, interpretable ML model using features managed in Feast.  
- Persist both the model and metrics for future use, such as deployment or reporting.


In [28]:
X = training_data[["sepal_length", "sepal_width", "petal_length", "petal_width"]]
y = training_data["species"]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.4, stratify=y, random_state=42)

model = DecisionTreeClassifier(max_depth=3, random_state=1)
model.fit(X_train, y_train)

preds = model.predict(X_test)
acc = accuracy_score(y_test, preds)
print(f"Training Accuracy: {acc:.4f}")

# Save model and metrics
joblib.dump(model, MODEL_PATH)
with open(METRICS_PATH, "w") as f:
    f.write(f"Training accuracy: {acc:.3f}\n")
print(f"Model saved at {MODEL_PATH} and metrics saved at {METRICS_PATH}")

Training Accuracy: 1.0000
Model saved at week-2/DVC-WEEK2-IITM/week-3/model/model.joblib and metrics saved at week-2/DVC-WEEK2-IITM/week-3/model/metrics.txt


### Retrieve Online Features from FeatureStore

This section demonstrates how to fetch **real-time feature values** from the **Feast online store**.

- `fs.get_online_features(...)` : Queries the online store for the latest feature values.  
- `features` : List of features to retrieve, specified as `"FeatureViewName:feature_name"`.  
- `entity_rows` : List of dictionaries containing entity keys for which to fetch features.  
  - Here, `iris_id` values 1001, 1002, 1003 are queried.  
- `.to_df()` : Converts the returned data into a Pandas DataFrame.

**Purpose:**  
- Allows real-time inference by retrieving the latest features for specific entities.  
- Ensures that the features used for prediction are consistent with those used during training.


In [29]:
online_features = fs.get_online_features(
    features=[
        "iris_features:sepal_length",
        "iris_features:sepal_width",
        "iris_features:petal_length",
        "iris_features:petal_width"
    ],
    entity_rows=[{"iris_id": i} for i in range(1001, 1004)]
).to_df()
print(online_features)

   iris_id  petal_width  sepal_length  petal_length  sepal_width
0     1001         1.09          5.45          3.84         2.36
1     1002         0.20          4.84          1.29         2.90
2     1003         0.29          4.85          1.19         3.40


## Make Online Predictions Using the Trained Model

This section demonstrates how to use the trained Decision Tree model to make **real-time predictions** using features fetched from Feast's online store.

### Step 1: Load the trained model

In [30]:
model = joblib.load(MODEL_PATH)

### Step 2: Prepare online features

In [31]:
X_live = online_features.drop(columns=["iris_id"])
X_live = X_live[X.columns]

### Step 3: Make predictions

In [32]:
live_preds = model.predict(X_live)

### Step 4: Decode predictions

In [33]:
decoded_preds = encoder.inverse_transform([[p] for p in live_preds])

### Step 5: Print predictions per entity

In [34]:
for fid, label in zip(online_features["iris_id"], decoded_preds):
    print(f"iris_id {fid} ➝ predicted species: {label[0]}")

iris_id 1001 ➝ predicted species: versicolor
iris_id 1002 ➝ predicted species: setosa
iris_id 1003 ➝ predicted species: setosa
