 ## Load Dataset

In [22]:
from ucimlrepo import fetch_ucirepo 
import pandas as pd
  
# fetch dataset 
wine = fetch_ucirepo(id=109) 
  
# data (as pandas dataframes) 
X = wine.data.features      
y = wine.data.targets  
df = pd.concat([X, y], axis=1)
  
# metadata 
print(wine.metadata) 
  
# variable information 
print(wine.variables) 


{'uci_id': 109, 'name': 'Wine', 'repository_url': 'https://archive.ics.uci.edu/dataset/109/wine', 'data_url': 'https://archive.ics.uci.edu/static/public/109/data.csv', 'abstract': 'Using chemical analysis to determine the origin of wines', 'area': 'Physics and Chemistry', 'tasks': ['Classification'], 'characteristics': ['Tabular'], 'num_instances': 178, 'num_features': 13, 'feature_types': ['Integer', 'Real'], 'demographics': [], 'target_col': ['class'], 'index_col': None, 'has_missing_values': 'no', 'missing_values_symbol': None, 'year_of_dataset_creation': 1992, 'last_updated': 'Mon Aug 28 2023', 'dataset_doi': '10.24432/C5PC7J', 'creators': ['Stefan Aeberhard', 'M. Forina'], 'intro_paper': {'ID': 246, 'type': 'NATIVE', 'title': 'Comparative analysis of statistical pattern recognition methods in high dimensional settings', 'authors': 'S. Aeberhard, D. Coomans, O. Vel', 'venue': 'Pattern Recognition', 'year': 1994, 'journal': None, 'DOI': '10.1016/0031-3203(94)90145-7', 'URL': 'https:

 ## EDA

In [23]:
df.columns


Index(['Alcohol', 'Malicacid', 'Ash', 'Alcalinity_of_ash', 'Magnesium',
       'Total_phenols', 'Flavanoids', 'Nonflavanoid_phenols',
       'Proanthocyanins', 'Color_intensity', 'Hue',
       '0D280_0D315_of_diluted_wines', 'Proline', 'class'],
      dtype='object')

In [24]:
df.shape


(178, 14)

In [25]:
# check data quality
from CustomUtils import DataQualityCheck

DataQualityCheck.data_quality_report(input_df=df, type='df')


In [26]:
df.isnull().sum()


Alcohol                         0
Malicacid                       0
Ash                             0
Alcalinity_of_ash               0
Magnesium                       0
Total_phenols                   0
Flavanoids                      0
Nonflavanoid_phenols            0
Proanthocyanins                 0
Color_intensity                 0
Hue                             0
0D280_0D315_of_diluted_wines    0
Proline                         0
class                           0
dtype: int64

In [27]:
df.duplicated().sum()


np.int64(0)

 ## Initialize ZenML

In [28]:
from zenml import pipeline, step
from zenml.client import Client
from typing import Tuple, Dict, List, Annotated
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
from zenml.logger import get_logger
import numpy as np

logger = get_logger(__name__)

# Initialize ZenML client
client = Client()


 ## Define ZenML Steps

In [29]:
@step
def load_data() -> Tuple[
    Annotated[pd.DataFrame, "features"],
    Annotated[pd.DataFrame, "targets"]
]:
    """Load the wine dataset."""
    from ucimlrepo import fetch_ucirepo
    
    wine = fetch_ucirepo(id=109)
    X = wine.data.features
    y = wine.data.targets
    
    logger.info(f"Loaded data - X shape: {X.shape}, y shape: {y.shape}")
    return X, y

@step
def split_data(
    X: pd.DataFrame, 
    y: pd.DataFrame,
    test_size: float = 0.2,
    random_state: int = 64
) -> Tuple[
    Annotated[pd.DataFrame, "X_train"],
    Annotated[pd.DataFrame, "X_test"],
    Annotated[pd.DataFrame, "y_train"],
    Annotated[pd.DataFrame, "y_test"]
]:
    """Split data into train and test sets."""
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=test_size, random_state=random_state
    )
    logger.info(f"Split data - Train: {X_train.shape}, Test: {X_test.shape}")
    return X_train, X_test, y_train, y_test

@step
def save_feature_names(X_train: pd.DataFrame) -> List[str]:
    """Save feature names to file and return them."""
    feature_names = X_train.columns.to_list()
    
    with open("feature_names.txt", "w") as f:
        for c in feature_names:
            f.write(c + "\n")
    
    logger.info(f"Saved {len(feature_names)} feature names")
    return feature_names

@step
def train_model(
    X_train: pd.DataFrame,
    y_train: pd.DataFrame,
    X_test: pd.DataFrame,
    y_test: pd.DataFrame,
    n_estimators: int = 100,
    random_state: int = 64
) -> Tuple[
    Annotated[float, "accuracy"],
    Annotated[Dict, "params"]
]:
    """Train a Random Forest model and return accuracy and params."""
    # Convert y to 1D array if needed
    y_train_array = y_train.values.ravel()
    y_test_array = y_test.values.ravel()
    
    model = RandomForestClassifier(n_estimators=n_estimators, random_state=random_state)
    model.fit(X_train, y_train_array)
    
    preds = model.predict(X_test)
    acc = accuracy_score(y_test_array, preds)
    
    params = {"n_estimators": n_estimators, "random_state": random_state}
    
    logger.info(f'Accuracy: {acc:.4f}')
    print(f'Accuracy: {acc:.4f}')
    
    return acc, params


 ## Simple Training Pipeline

In [30]:
@pipeline
def training_pipeline(
    test_size: float = 0.2,
    n_estimators: int = 100,
    random_state: int = 64
):
    """Simple training pipeline."""
    # Load data
    X, y = load_data()
    
    # Split data
    X_train, X_test, y_train, y_test = split_data(
        X=X, 
        y=y, 
        test_size=test_size, 
        random_state=random_state
    )
    
    # Save feature names
    feature_names = save_feature_names(X_train=X_train)
    
    # Train model
    accuracy, params = train_model(
        X_train=X_train,
        y_train=y_train,
        X_test=X_test,
        y_test=y_test,
        n_estimators=n_estimators,
        random_state=random_state
    )

# Run the pipeline
training_pipeline()


[37mInitiating a new run for the pipeline: [0m[38;5;105mtraining_pipeline[37m.[0m
[37mUsing user: [0m[38;5;105mdefault[37m[0m
[37mUsing stack: [0m[38;5;105mdefault[37m[0m
[37m  deployer: [0m[38;5;105mdefault[37m[0m
[37m  artifact_store: [0m[38;5;105mdefault[37m[0m
[37m  orchestrator: [0m[38;5;105mdefault[37m[0m
[37mYou can visualize your pipeline runs in the [0m[38;5;105mZenML Dashboard[37m. In order to try it locally, please run [0m[38;5;105mzenml login --local[37m.[0m
[37mStep [0m[38;5;105mload_data[37m has started.[0m
[37mPreparing to run step [0m[38;5;105mload_data[37m.[0m
[37mLoaded data - X shape: (178, 13), y shape: (178, 1)[0m
[33mBy default, the [0m[38;5;105mPandasMaterializer[33m stores data as a [0m[38;5;105m.csv[33m file. If you want to store data more efficiently, you can install [0m[38;5;105mpyarrow[33m by running '[0m[38;5;105mpip install pyarrow[33m'. This will allow [0m[38;5;105mPandasMaterializer[33m to

PipelineRunResponse(body=PipelineRunResponseBody(created=datetime.datetime(2026, 1, 7, 13, 48, 5, 662842), updated=datetime.datetime(2026, 1, 7, 13, 48, 16, 362019), user_id=UUID('e0ecb2bb-e896-4c70-a614-0ecb9a68d788'), project_id=UUID('e68f6b36-04e8-4430-b5d8-383ce343a617'), status=<ExecutionStatus.COMPLETED: 'completed'>, in_progress=False, status_reason=None, index=3), metadata=PipelineRunResponseMetadata(run_metadata={}, config=PipelineConfiguration(enable_cache=None, enable_artifact_metadata=None, enable_artifact_visualization=None, enable_step_logs=None, environment={}, secrets=[], enable_pipeline_logs=None, execution_mode=<ExecutionMode.CONTINUE_ON_FAILURE: 'continue_on_failure'>, settings={}, tags=None, extra={}, failure_hook_source=None, success_hook_source=None, init_hook_source=None, init_hook_kwargs=None, cleanup_hook_source=None, model=None, parameters={'test_size': 0.2, 'n_estimators': 100, 'random_state': 64}, retry=None, substitutions={'date': '2026_01_07', 'time': '13_

 ## Grid Search Pipeline

In [31]:
@step
def grid_search_train(
    X_train: pd.DataFrame,
    y_train: pd.DataFrame,
    X_test: pd.DataFrame,
    y_test: pd.DataFrame,
    n_estimators_list: List[int]
) -> Tuple[
    Annotated[float, "best_accuracy"],
    Annotated[Dict, "best_params"],
    Annotated[List[Dict], "all_results"]
]:
    """Perform grid search and return best model info and all results."""
    from sklearn.model_selection import GridSearchCV
    
    # Convert y to 1D array
    y_train_array = y_train.values.ravel()
    y_test_array = y_test.values.ravel()
    
    param_grid = {"n_estimators": n_estimators_list}
    
    grid = GridSearchCV(
        RandomForestClassifier(random_state=64, n_jobs=-1),
        param_grid,
        cv=5,
        scoring="accuracy",
        return_train_score=True
    )
    
    grid.fit(X_train, y_train_array)
    
    all_results = []
    best_acc = 0
    best_params = {}
    
    # Iterate over each candidate
    for i in range(len(grid.cv_results_["params"])):
        params = grid.cv_results_["params"][i]
        mean_val = grid.cv_results_["mean_test_score"][i]
        std_val = grid.cv_results_["std_test_score"][i]
        
        # Build & refit the model manually for logging
        model = RandomForestClassifier(**params, random_state=64, n_jobs=-1)
        model.fit(X_train, y_train_array)
        
        preds = model.predict(X_test)
        acc = accuracy_score(y_test_array, preds)
        
        result = {
            "params": params,
            "cv_mean_accuracy": float(mean_val),
            "cv_std_accuracy": float(std_val),
            "test_accuracy": float(acc)
        }
        all_results.append(result)
        
        logger.info(f"Model {i+1}: n_estimators={params['n_estimators']}, "
                   f"test_acc={acc:.4f}, cv_mean={mean_val:.4f}")
        
        if acc > best_acc:
            best_acc = acc
            best_params = params
    
    logger.info(f"Best model: {best_params}, accuracy: {best_acc:.4f}")
    
    return best_acc, best_params, all_results

@pipeline
def grid_search_pipeline(
    test_size: float = 0.2,
    random_state: int = 64
):
    """Grid search training pipeline."""
    # Load data
    X, y = load_data()
    
    # Split data
    X_train, X_test, y_train, y_test = split_data(
        X=X,
        y=y,
        test_size=test_size,
        random_state=random_state
    )
    
    # Save feature names
    feature_names = save_feature_names(X_train=X_train)
    
    # Grid search
    n_estimators_list = [72, 100, 125, 150, 200, 250]
    best_acc, best_params, all_results = grid_search_train(
        X_train=X_train,
        y_train=y_train,
        X_test=X_test,
        y_test=y_test,
        n_estimators_list=n_estimators_list
    )

# Run the grid search pipeline
grid_search_pipeline()


[37mInitiating a new run for the pipeline: [0m[38;5;105mgrid_search_pipeline[37m.[0m
[37mRegistered new pipeline: [0m[38;5;105mgrid_search_pipeline[37m.[0m
[37mUsing user: [0m[38;5;105mdefault[37m[0m
[37mUsing stack: [0m[38;5;105mdefault[37m[0m
[37m  deployer: [0m[38;5;105mdefault[37m[0m
[37m  artifact_store: [0m[38;5;105mdefault[37m[0m
[37m  orchestrator: [0m[38;5;105mdefault[37m[0m
[37mYou can visualize your pipeline runs in the [0m[38;5;105mZenML Dashboard[37m. In order to try it locally, please run [0m[38;5;105mzenml login --local[37m.[0m
[37mUsing cached version of step [0m[38;5;105mload_data[37m.[0m
[37mUsing cached version of step [0m[38;5;105msplit_data[37m.[0m
[37mUsing cached version of step [0m[38;5;105msave_feature_names[37m.[0m
[37mStep [0m[38;5;105mgrid_search_train[37m has started.[0m
[37mPreparing to run step [0m[38;5;105mgrid_search_train[37m.[0m
[33mBy default, the [0m[38;5;105mPandasMaterializer[

PipelineRunResponse(body=PipelineRunResponseBody(created=datetime.datetime(2026, 1, 7, 13, 48, 41, 652271), updated=datetime.datetime(2026, 1, 7, 13, 48, 55, 392691), user_id=UUID('e0ecb2bb-e896-4c70-a614-0ecb9a68d788'), project_id=UUID('e68f6b36-04e8-4430-b5d8-383ce343a617'), status=<ExecutionStatus.COMPLETED: 'completed'>, in_progress=False, status_reason=None, index=1), metadata=PipelineRunResponseMetadata(run_metadata={}, config=PipelineConfiguration(enable_cache=None, enable_artifact_metadata=None, enable_artifact_visualization=None, enable_step_logs=None, environment={}, secrets=[], enable_pipeline_logs=None, execution_mode=<ExecutionMode.CONTINUE_ON_FAILURE: 'continue_on_failure'>, settings={}, tags=None, extra={}, failure_hook_source=None, success_hook_source=None, init_hook_source=None, init_hook_kwargs=None, cleanup_hook_source=None, model=None, parameters={'test_size': 0.2, 'random_state': 64}, retry=None, substitutions={'date': '2026_01_07', 'time': '13_48_41_643687'}, cach

In [32]:
# Load feature names
with open("feature_names.txt") as f:
    feature_names = [line.strip() for line in f]

print(f"Feature names: {feature_names}")


Feature names: ['Alcohol', 'Malicacid', 'Ash', 'Alcalinity_of_ash', 'Magnesium', 'Total_phenols', 'Flavanoids', 'Nonflavanoid_phenols', 'Proanthocyanins', 'Color_intensity', 'Hue', '0D280_0D315_of_diluted_wines', 'Proline']


 ## View ZenML Dashboard



 To view your pipeline runs, models, and artifacts:



 ```bash

 zenml up

 ```



 The dashboard will show:

 - All pipeline runs with their steps

 - Artifacts (datasets, models, metrics)

 - Model versions and metadata

 - Lineage tracking

 ## Model Deployment



 For production deployment with ZenML, you can use various deployment integrations:



 ```python

 # Example: Deploy with MLflow (requires mlflow integration)

 from zenml.integrations.mlflow.steps import mlflow_model_deployer_step



 @pipeline

 def deployment_pipeline():

     # ... training steps ...

     mlflow_model_deployer_step(

         model=trained_model,

         deploy_decision=True

     )

 ```



 Or use other deployers:

 - Seldon Core

 - KServe

 - BentoML

 - Custom deployers



 Install integration: `zenml integration install mlflow`