In [1]:
!pip install feast scikit-learn 'feast[gcp]' mlflow


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.2[0m[39;49m -> [0m[32;49m25.3[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [2]:
import os
import pandas as pd
import feast
from datetime import datetime, timedelta
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn import metrics
import mlflow
import mlflow.sklearn

In [3]:
FEAST_REPO_PATH = "iris_feast_repo"
os.environ['FEAST_REPO_PATH'] = FEAST_REPO_PATH 
IRIS_DATA_FILE = "iris_data_adapted_for_feast.csv"
FEATURE_COLS = ['sepal_length', 'sepal_width', 'petal_length', 'petal_width']

# Map for species
SPECIES_MAP = {'setosa': 0, 'versicolor': 1, 'virginica': 2}
REVERSE_SPECIES_MAP = {v: k for k, v in SPECIES_MAP.items()}

MLFLOW_TRACKING_URI = "http://34.27.65.90:8100/"
MLFLOW_EXPERIMENT_NAME = "iris_classifier_feast_tuning"
MLFLOW_MODEL_NAME = "iris_classifier"

In [4]:
csv_path = "data/iris_data_adapted_for_feast.csv"
df = pd.read_csv(csv_path)


df['event_timestamp'] = pd.to_datetime(df['event_timestamp'])

# Save as Parquet
parquet_path = "data/iris_data_adapted_for_feast.parquet"
df.to_parquet(parquet_path, index=False)

print(f"Converted {csv_path} to {parquet_path}")

Converted data/iris_data_adapted_for_feast.csv to data/iris_data_adapted_for_feast.parquet


In [5]:
current_dir = os.getcwd()
os.chdir(FEAST_REPO_PATH)
! feast apply
os.chdir(current_dir) 
print("Feast repository deployed successfully.")

No project found in the repository. Using project name iris_classifier_feast defined in feature_store.yaml
Applying changes for project iris_classifier_feast
Updated feature view [1m[33miris_stats[0m
	batch_source: [1m[33mtype: BATCH_FILE
timestamp_field: "event_timestamp"
file_options {
  uri: "../data/iris_data_adapted_for_feast.parquet"
}
data_source_class_type: "feast.infra.offline_stores.file_source.FileSource"
name: "../data/iris_data_adapted_for_feast.parquet"
meta {
  created_timestamp {
    seconds: 1760191133
    nanos: 223473000
  }
  last_updated_timestamp {
    seconds: 1760191133
    nanos: 264768000
  }
}
[0m -> [1m[92mtype: BATCH_FILE
timestamp_field: "event_timestamp"
file_options {
  uri: "../data/iris_data_adapted_for_feast.parquet"
}
data_source_class_type: "feast.infra.offline_stores.file_source.FileSource"
name: "../data/iris_data_adapted_for_feast.parquet"
meta {
  created_timestamp {
    seconds: 1761920420
    nanos: 617147000
  }
  last_updated_timesta

In [6]:
full_data = pd.read_csv(f"data/{IRIS_DATA_FILE}")
full_data['event_timestamp'] = pd.to_datetime(full_data['event_timestamp']).dt.tz_localize('UTC')
full_data['target'] = full_data['species'].map(SPECIES_MAP)


entity_df = full_data[['iris_id', 'event_timestamp', 'target']].copy()

In [7]:
print("--- 1. Setting up MLflow Experiment ---")
mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
client = mlflow.MlflowClient()

# Get or create the experiment
experiment = client.get_experiment_by_name(MLFLOW_EXPERIMENT_NAME)
if experiment is None:
    print(f"Experiment '{MLFLOW_EXPERIMENT_NAME}' not found. Creating new experiment.")
    experiment_id = client.create_experiment(MLFLOW_EXPERIMENT_NAME)
else:
    print(f"Experiment '{MLFLOW_EXPERIMENT_NAME}' found.")
    experiment_id = experiment.experiment_id

mlflow.set_experiment(experiment_name=MLFLOW_EXPERIMENT_NAME)
print(f"MLflow experiment set to: {MLFLOW_EXPERIMENT_NAME} (ID: {experiment_id})")

--- 1. Setting up MLflow Experiment ---
Experiment 'iris_classifier_feast_tuning' not found. Creating new experiment.
MLflow experiment set to: iris_classifier_feast_tuning (ID: 886139258646959942)


In [8]:
print("\n--- 2. Retrieving Historical Features for Training (Offline Store) ---")
fs = feast.FeatureStore(repo_path=FEAST_REPO_PATH)
training_df = fs.get_historical_features(
    entity_df=entity_df,
    features=[f"iris_stats:{col}" for col in FEATURE_COLS],
).to_df()

# Merge target
if 'target' not in training_df.columns:
    training_df = pd.merge(
        training_df, 
        entity_df[['iris_id', 'event_timestamp', 'target']], 
        on=['iris_id', 'event_timestamp'], 
        how='left'
    )
print(f"Training data shape: {training_df.shape}")


--- 2. Retrieving Historical Features for Training (Offline Store) ---
Training data shape: (45, 7)


In [9]:
# Prepare data for model training
X = training_df[FEATURE_COLS]
y = training_df['target']
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.4, stratify=y, random_state=42
)

In [10]:
print("\n--- 3. Starting Hyperparameter Tuning with MLflow ---")

# Objective 1: Define hyperparameter grid
hyperparameters = {
    "max_depth": [2, 3, 5, 10],
    "criterion": ["gini", "entropy"]
}

for criterion in hyperparameters["criterion"]:
    for depth in hyperparameters["max_depth"]:
        
        #Log experiment run to MLflow
        with mlflow.start_run(experiment_id=experiment_id) as run:
            run_id = run.info.run_id
            print(f"\n--- Starting Run: {run_id} ---")
            
            # Log parameters
            mlflow.log_param("max_depth", depth)
            mlflow.log_param("criterion", criterion)
            print(f"Params: criterion={criterion}, max_depth={depth}")

            # Train the model
            mod_dt = DecisionTreeClassifier(
                max_depth=depth, 
                criterion=criterion, 
                random_state=1
            )
            mod_dt.fit(X_train, y_train)

            # Evaluate model
            prediction = mod_dt.predict(X_test)
            accuracy = metrics.accuracy_score(prediction, y_test)

            # Log metric
            mlflow.log_metric("accuracy", accuracy)
            print(f"Metrics: Accuracy={accuracy:.3f}")

            # Objective 4: Log model, removing DVC/joblib dependency
            mlflow.sklearn.log_model(
                sk_model=mod_dt,
                artifact_path="model"
            )
            print(f"Model logged as artifact for run {run_id}.")

print("\n--- Hyperparameter tuning complete. All runs logged to MLflow. ---")


--- 3. Starting Hyperparameter Tuning with MLflow ---





--- Starting Run: e20cb42b097b437da90aa0567f3d9677 ---
Params: criterion=gini, max_depth=2
Metrics: Accuracy=1.000




Model logged as artifact for run e20cb42b097b437da90aa0567f3d9677.
🏃 View run serious-steed-327 at: http://34.27.65.90:8100/#/experiments/886139258646959942/runs/e20cb42b097b437da90aa0567f3d9677
🧪 View experiment at: http://34.27.65.90:8100/#/experiments/886139258646959942

--- Starting Run: 9a4cc0d4ffa146cebfe362b9a54b61c6 ---
Params: criterion=gini, max_depth=3
Metrics: Accuracy=1.000




Model logged as artifact for run 9a4cc0d4ffa146cebfe362b9a54b61c6.
🏃 View run debonair-vole-462 at: http://34.27.65.90:8100/#/experiments/886139258646959942/runs/9a4cc0d4ffa146cebfe362b9a54b61c6
🧪 View experiment at: http://34.27.65.90:8100/#/experiments/886139258646959942

--- Starting Run: db7a77496e4741f8a35edb452bd6b3f6 ---
Params: criterion=gini, max_depth=5
Metrics: Accuracy=1.000




Model logged as artifact for run db7a77496e4741f8a35edb452bd6b3f6.
🏃 View run defiant-colt-449 at: http://34.27.65.90:8100/#/experiments/886139258646959942/runs/db7a77496e4741f8a35edb452bd6b3f6
🧪 View experiment at: http://34.27.65.90:8100/#/experiments/886139258646959942

--- Starting Run: 5449d3b2a465410a96c04b4577e0e3fa ---
Params: criterion=gini, max_depth=10
Metrics: Accuracy=1.000




Model logged as artifact for run 5449d3b2a465410a96c04b4577e0e3fa.
🏃 View run fortunate-crow-71 at: http://34.27.65.90:8100/#/experiments/886139258646959942/runs/5449d3b2a465410a96c04b4577e0e3fa
🧪 View experiment at: http://34.27.65.90:8100/#/experiments/886139258646959942

--- Starting Run: ea14754f5513448cbdbcb78acb51f0aa ---
Params: criterion=entropy, max_depth=2
Metrics: Accuracy=1.000




Model logged as artifact for run ea14754f5513448cbdbcb78acb51f0aa.
🏃 View run silent-fly-501 at: http://34.27.65.90:8100/#/experiments/886139258646959942/runs/ea14754f5513448cbdbcb78acb51f0aa
🧪 View experiment at: http://34.27.65.90:8100/#/experiments/886139258646959942

--- Starting Run: 82a3bced68334639a651bd7e82fb107f ---
Params: criterion=entropy, max_depth=3
Metrics: Accuracy=1.000




Model logged as artifact for run 82a3bced68334639a651bd7e82fb107f.
🏃 View run upset-goat-94 at: http://34.27.65.90:8100/#/experiments/886139258646959942/runs/82a3bced68334639a651bd7e82fb107f
🧪 View experiment at: http://34.27.65.90:8100/#/experiments/886139258646959942

--- Starting Run: 4c1012b67c2e436eb77232911a37fdce ---
Params: criterion=entropy, max_depth=5
Metrics: Accuracy=1.000




Model logged as artifact for run 4c1012b67c2e436eb77232911a37fdce.
🏃 View run sincere-gnat-65 at: http://34.27.65.90:8100/#/experiments/886139258646959942/runs/4c1012b67c2e436eb77232911a37fdce
🧪 View experiment at: http://34.27.65.90:8100/#/experiments/886139258646959942

--- Starting Run: 355411c833494dc4a7804201bb884658 ---
Params: criterion=entropy, max_depth=10
Metrics: Accuracy=1.000




Model logged as artifact for run 355411c833494dc4a7804201bb884658.
🏃 View run upbeat-stork-441 at: http://34.27.65.90:8100/#/experiments/886139258646959942/runs/355411c833494dc4a7804201bb884658
🧪 View experiment at: http://34.27.65.90:8100/#/experiments/886139258646959942

--- Hyperparameter tuning complete. All runs logged to MLflow. ---


In [11]:
print("--- 4. Finding and Registering Best Model ---")

# Search for the best run in the experiment
best_run = mlflow.search_runs(
    experiment_ids=[experiment_id],
    order_by=["metrics.accuracy DESC"],
    max_results=1
).iloc[0]

best_run_id = best_run.run_id
best_accuracy = best_run["metrics.accuracy"]
best_model_uri = f"runs:/{best_run_id}/model"

print(f"Best run found: {best_run_id}")
print(f"Best model accuracy: {best_accuracy:.3f}")
print(f"Best model URI: {best_model_uri}")

--- 4. Finding and Registering Best Model ---
Best run found: 355411c833494dc4a7804201bb884658
Best model accuracy: 1.000
Best model URI: runs:/355411c833494dc4a7804201bb884658/model


In [12]:
# Register the best model
print(f"Registering model as '{MLFLOW_MODEL_NAME}'...")
model_version = mlflow.register_model(
    model_uri=best_model_uri,
    name=MLFLOW_MODEL_NAME
)

print(f"Model '{MLFLOW_MODEL_NAME}' version {model_version.version} registered.")

Successfully registered model 'iris_classifier'.
2025/10/31 14:25:13 INFO mlflow.store.model_registry.abstract_store: Waiting up to 300 seconds for model version to finish creation. Model name: iris_classifier, version 1


Registering model as 'iris_classifier'...
Model 'iris_classifier' version 1 registered.


Created version '1' of model 'iris_classifier'.


In [13]:
print(f"Transitioning model version {model_version.version} to 'Production' stage...")
client.transition_model_version_stage(
    name=MLFLOW_MODEL_NAME,
    version=model_version.version,
    stage="Production",
    archive_existing_versions=True
)

print(f"Model version {model_version.version} is now in 'Production'.")

Transitioning model version 1 to 'Production' stage...
Model version 1 is now in 'Production'.


  client.transition_model_version_stage(


In [14]:
print("\n--- 6. Retrieving Online Features and Best Model for Inference ---")

# We will load the model version we previously staged as "Production"
model_uri = f"models:/{MLFLOW_MODEL_NAME}/Production"

print(f"Loading 'Production' model from: {model_uri}")
try:
    model = mlflow.sklearn.load_model(model_uri)
    print("Model loaded successfully.")
except Exception as e:
    print(f"Error loading model: {e}")
    print("Please ensure the model was registered and transitioned to 'Production'")


--- 6. Retrieving Online Features and Best Model for Inference ---
Loading 'Production' model from: models:/iris_classifier/Production


Downloading artifacts:   0%|          | 0/5 [00:00<?, ?it/s]

Model loaded successfully.


In [15]:
try:
    test_iris_ids = training_df.iloc[y_test.index]['iris_id'].unique().tolist()[:5]
except NameError:
    print("Could not find training_df or y_test. Using hardcoded IDs for inference.")
    # Fallback if the notebook state is lost
    test_iris_ids = [1001, 1002, 1003, 1004, 1005] 

# Define entity rows for online lookup
entity_rows = [{"iris_id": id} for id in test_iris_ids]

# Retrieve features from the Feast online store
print("\n--- Retrieving Online Features for Inference (Online Store) ---")
online_features = fs.get_online_features(
    entity_rows=entity_rows,
    features=[f"iris_stats:{col}" for col in FEATURE_COLS],
)

# Convert the feature vector to a DataFrame for prediction
inference_df = pd.DataFrame.from_dict(online_features.to_dict())

# Ensure columns exist before trying to predict
if not all(col in inference_df.columns for col in FEATURE_COLS):
     print(f"Error: Missing one or more feature columns. Found: {inference_df.columns.tolist()}")
else:
    X_inference = inference_df[FEATURE_COLS]

    # Make the prediction using the MLflow model
    online_predictions_raw = model.predict(X_inference)
    online_predictions = [REVERSE_SPECIES_MAP[p] for p in online_predictions_raw]

    # Display the results
    inference_results = pd.DataFrame({
        'iris_id': inference_df['iris_id'], 
        'predicted_species': online_predictions
    })

    print("\n--- Final Online Inference Results (using MLflow 'Production' Model) ---")
    print(inference_results.to_markdown(index=False, numalign="left", stralign="left"))


--- Retrieving Online Features for Inference (Online Store) ---

--- Final Online Inference Results (using MLflow 'Production' Model) ---
| iris_id   | predicted_species   |
|:----------|:--------------------|
| 1003      | setosa              |
| 1001      | versicolor          |
| 1002      | setosa              |
