<a href="https://colab.research.google.com/github/Krish6115/MLLab/blob/main/Lab4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import files
uploaded = files.upload()

Saving Crop_recommendation.csv to Crop_recommendation.csv


In [2]:
# Subject: 23CSE301 — Lab Session 04 (Classification with kNN and metrics)
# Dataset used: Crop_recommendation.csv (attached)
# IMPORTANT: Per coding instructions, all functionality is implemented as functions with NO prints inside.
# All prints/logging are in the "main" section at the bottom.
# Each function is preceded by a brief comment that includes the corresponding assignment question.

from typing import Tuple, Dict, List
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split, GridSearchCV, RandomizedSearchCV
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.metrics import (
    confusion_matrix,
    precision_score,
    recall_score,
    f1_score,
    mean_squared_error,
    r2_score
)
from sklearn.neighbors import KNeighborsClassifier
from sklearn.base import BaseEstimator


# A small helper for metrics dict formatting
def _classification_metrics(y_true: np.ndarray, y_pred: np.ndarray, average: str = "weighted") -> Dict[str, float]:
    """Return precision, recall, f1 (weighted) and accuracy-like stats for a classification prediction."""
    return {
        "precision": float(precision_score(y_true, y_pred, average=average, zero_division=0)),
        "recall": float(recall_score(y_true, y_pred, average=average, zero_division=0)),
        "f1": float(f1_score(y_true, y_pred, average=average, zero_division=0))
    }


# A0. Data loading and split helper (used by multiple questions)
def load_and_split_data(
    csv_path: str,
    label_col: str = "label",
    test_size: float = 0.2,
    random_state: int = 42,
    stratify: bool = True
) -> Tuple[pd.DataFrame, pd.Series, pd.DataFrame, pd.Series]:
    """
    Load dataset, split into train/test.
    Returns X_train, y_train, X_test, y_test.
    """
    df = pd.read_csv(csv_path)
    X = df.drop(columns=[label_col])
    y = df[label_col]
    strat = y if stratify else None
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=test_size, random_state=random_state, stratify=strat
    )
    return X_train, y_train, X_test, y_test


# A1. Evaluate confusion matrix, precision, recall, F1 for both train and test; infer fit by metrics spread
def evaluate_knn_confusion_and_metrics(
    X_train: pd.DataFrame,
    y_train: pd.Series,
    X_test: pd.DataFrame,
    y_test: pd.Series,
    k: int = 5
) -> Dict[str, Dict]:
    """
    Train a kNN(k) with standardization, compute confusion matrices and metrics for train and test.
    Returns a dictionary with:
      - "model": fitted pipeline
      - "train": {"confusion_matrix": np.ndarray, "metrics": {...}}
      - "test":  {"confusion_matrix": np.ndarray, "metrics": {...}}
    """
    pipe = Pipeline([
        ("scaler", StandardScaler()),
        ("knn", KNeighborsClassifier(n_neighbors=k))
    ])
    pipe.fit(X_train, y_train)

    y_pred_train = pipe.predict(X_train)
    y_pred_test = pipe.predict(X_test)

    # Build class order based on observed labels in train+test for consistent CM shape
    classes = np.unique(np.concatenate([y_train.values, y_test.values]))

    train_cm = confusion_matrix(y_train, y_pred_train, labels=classes)
    test_cm = confusion_matrix(y_test, y_pred_test, labels=classes)

    result = {
        "model": pipe,
        "classes": classes.tolist(),
        "train": {
            "confusion_matrix": train_cm,
            "metrics": _classification_metrics(y_train, y_pred_train, average="weighted"),
        },
        "test": {
            "confusion_matrix": test_cm,
            "metrics": _classification_metrics(y_test, y_pred_test, average="weighted"),
        }
    }
    return result


# A2. Calculate MSE, RMSE, MAPE and R2 for a price prediction exercise (Lab 02)
# For this lab, if a price column is not available, we emulate usage by accepting predictions externally.
def regression_error_metrics(
    y_true: np.ndarray,
    y_pred: np.ndarray
) -> Dict[str, float]:
    """
    Compute MSE, RMSE, MAPE, and R2.
    Note: Provide y_true and y_pred from your Lab 02 regression.
    """
    mse = float(mean_squared_error(y_true, y_pred))
    rmse = float(np.sqrt(mse))
    # MAPE with protection against division by zero
    eps = 1e-8
    mape = float(np.mean(np.abs((y_true - y_pred) / (np.clip(np.abs(y_true), eps, None)))) * 100.0)
    r2 = float(r2_score(y_true, y_pred))
    return {"MSE": mse, "RMSE": rmse, "MAPE%": mape, "R2": r2}


# A3. Generate 20 2D points in [1,10] for two classes and return as a DataFrame with colors
def generate_training_points_2d(
    n_points: int = 20,
    low: float = 1.0,
    high: float = 10.0,
    random_state: int = 42
) -> pd.DataFrame:
    """
    Generate synthetic 2D training data (X,Y) in [low, high].
    Assign classes based on a simple linear rule: class1 if X+Y > median(X+Y), else class0.
    Returns a DataFrame with columns: X, Y, label, color.
    """
    rng = np.random.default_rng(random_state)
    X_vals = rng.uniform(low, high, size=n_points)
    Y_vals = rng.uniform(low, high, size=n_points)
    s = X_vals + Y_vals
    thresh = np.median(s)
    labels = np.where(s > thresh, 1, 0)
    colors = np.where(labels == 1, "red", "blue")

    return pd.DataFrame({"X": X_vals, "Y": Y_vals, "label": labels, "color": colors})


# A4. Generate dense grid test points and classify with kNN(k=3) trained on A3 data
def classify_grid_with_knn(
    train_df: pd.DataFrame,
    k: int = 3,
    step: float = 0.1
) -> pd.DataFrame:
    """
    Train kNN(k) on provided 2D train_df with columns X,Y,label.
    Generate grid over [0,10]x[0,10] with 'step' increments and predict class for each grid point.
    Returns a DataFrame with columns: X, Y, pred_label, pred_color.
    """
    # Train
    X_train = train_df[["X", "Y"]].values
    y_train = train_df["label"].values
    pipe = Pipeline([
        ("scaler", StandardScaler()),
        ("knn", KNeighborsClassifier(n_neighbors=k))
    ])
    pipe.fit(X_train, y_train)

    # Grid
    grid_vals = np.arange(0.0, 10.0 + 1e-9, step)
    gx, gy = np.meshgrid(grid_vals, grid_vals)
    grid_points = np.c_[gx.ravel(), gy.ravel()]

    preds = pipe.predict(grid_points)
    pred_colors = np.where(preds == 1, "red", "blue")

    return pd.DataFrame({
        "X": grid_points[:, 0],
        "Y": grid_points[:, 1],
        "pred_label": preds,
        "pred_color": pred_colors
    })


# A5. Repeat A4 for various k values; return predictions for each k
def classify_grid_multiple_k(
    train_df: pd.DataFrame,
    k_values: List[int],
    step: float = 0.1
) -> Dict[int, pd.DataFrame]:
    """
    For each k in k_values, call classify_grid_with_knn and return dict {k: df}.
    """
    out = {}
    for k in k_values:
        out[k] = classify_grid_with_knn(train_df, k=k, step=step)
    return out


# A6. Repeat A3–A5 for project data: choose any two features and any two classes
def project_2d_knn_boundaries(
    X_train: pd.DataFrame,
    y_train: pd.Series,
    feature_pair: Tuple[str, str],
    class_subset: List[str],
    k_values: List[int],
    step: float = 0.1
) -> Dict[int, pd.DataFrame]:
    """
    From project data, select two features and restrict to given class_subset (binary classification).
    Train kNN for each k and predict across a 2D grid spanning the selected feature ranges (extended to [min-5%, max+5%]).
    Returns {k: DataFrame(X,Y,pred_label)} in feature space notation.
    """
    f1, f2 = feature_pair
    mask = y_train.isin(class_subset)
    Xb = X_train.loc[mask, [f1, f2]]
    yb = y_train.loc[mask].copy()
    yb = yb.astype(str).values  # ensure string labels for clarity

    # Define grid bounds slightly beyond observed range
    def _bounds(arr: np.ndarray, pad: float = 0.05):
        lo, hi = np.min(arr), np.max(arr)
        padv = (hi - lo) * pad if hi > lo else 1.0
        return lo - padv, hi + padv

    x_lo, x_hi = _bounds(Xb[f1].values)
    y_lo, y_hi = _bounds(Xb[f2].values)

    gx = np.arange(x_lo, x_hi, step)
    gy = np.arange(y_lo, y_hi, step)
    MX, MY = np.meshgrid(gx, gy)
    grid_points = np.c_[MX.ravel(), MY.ravel()]

    results = {}
    for k in k_values:
        pipe = Pipeline([
            ("scaler", StandardScaler()),
            ("knn", KNeighborsClassifier(n_neighbors=k))
        ])
        pipe.fit(Xb.values, yb)
        preds = pipe.predict(grid_points)
        results[k] = pd.DataFrame({
            f1: grid_points[:, 0],
            f2: grid_points[:, 1],
            "pred_label": preds
        })
    return results


# A7. Hyper-parameter tuning to find ideal k using GridSearchCV or RandomizedSearchCV
def tune_knn_k(
    X_train: pd.DataFrame,
    y_train: pd.Series,
    k_grid: List[int] = None,
    use_random: bool = False,
    cv: int = 5,
    scoring: str = "f1_weighted",
    random_iter: int = 20,
    random_state: int = 42
) -> Dict:
    """
    Tune k for kNN using GridSearchCV (default) or RandomizedSearchCV.
    Returns dict with best_k, best_score, and fitted search object.
    """
    if k_grid is None:
        k_grid = list(range(1, 31, 2))

    pipe = Pipeline([
        ("scaler", StandardScaler()),
        ("knn", KNeighborsClassifier())
    ])

    param_grid = {"knn__n_neighbors": k_grid}

    if use_random:
        search = RandomizedSearchCV(
            pipe,
            param_distributions=param_grid,
            n_iter=min(random_iter, len(k_grid)),
            scoring=scoring,
            cv=cv,
            random_state=random_state,
            n_jobs=-1
        )
    else:
        search = GridSearchCV(
            pipe,
            param_grid=param_grid,
            scoring=scoring,
            cv=cv,
            n_jobs=-1
        )

    search.fit(X_train, y_train)
    best_k = int(search.best_params_["knn__n_neighbors"])
    best_score = float(search.best_score_)
    return {"best_k": best_k, "best_score": best_score, "search": search}


# ============================
# Main program (prints only)
# ============================
if __name__ == "__main__":
    # Load dataset
    X_train, y_train, X_test, y_test = load_and_split_data("Crop_recommendation.csv")

    # A1
    a1_out = evaluate_knn_confusion_and_metrics(X_train, y_train, X_test, y_test, k=5)
    print("\nA1: Confusion matrices and metrics (k=5)")
    print("Classes (order):", a1_out["classes"])
    print("Train confusion matrix:\n", a1_out["train"]["confusion_matrix"])
    print("Train metrics:", a1_out["train"]["metrics"])
    print("Test confusion matrix:\n", a1_out["test"]["confusion_matrix"])
    print("Test metrics:", a1_out["test"]["metrics"])

    # A2
    # NOTE: Replace the following with real y_true, y_pred from Lab 02 regression task.
    # Here we demonstrate the function with a small dummy vector to keep this script complete.
    y_true_demo = np.array([100, 120, 140, 160, 180], dtype=float)
    y_pred_demo = np.array([98, 118, 150, 155, 185], dtype=float)
    a2_metrics = regression_error_metrics(y_true_demo, y_pred_demo)
    print("\nA2: Regression error metrics (demo values)")
    print(a2_metrics)

    # A3
    train2d = generate_training_points_2d(n_points=20, random_state=7)
    print("\nA3: 2D training set head:")
    print(train2d.head())

    # A4
    grid_pred_k3 = classify_grid_with_knn(train2d, k=3, step=0.2)  # use 0.2 to keep runtime/memory reasonable
    print("\nA4: Grid predictions (k=3) sample:")
    print(grid_pred_k3.head())

    # A5
    k_list = [1, 3, 5, 9, 15]
    grid_multi = classify_grid_multiple_k(train2d, k_values=k_list, step=0.2)
    print("\nA5: Generated grid predictions for ks:", k_list)
    for k in k_list:
        print(f"k={k}, rows={len(grid_multi[k])}")

    # A6
    # Pick two features and two classes from the project data (e.g., N and temperature; classes rice vs maize)
    feature_pair = ("N", "temperature")
    class_subset = ["rice", "maize"]
    proj_boundaries = project_2d_knn_boundaries(
        pd.concat([X_train, X_test], ignore_index=True),
        pd.concat([y_train, y_test], ignore_index=True),
        feature_pair=feature_pair,
        class_subset=class_subset,
        k_values=[3, 7, 11],
        step=0.5
    )
    print("\nA6: Project 2D kNN decision grids generated for k in [3,7,11].")
    for k, df_k in proj_boundaries.items():
        print(f"k={k}, grid points={len(df_k)}")

    # A7
    tuning = tune_knn_k(X_train, y_train, k_grid=list(range(1, 26, 2)), use_random=False, cv=5)
    print("\nA7: Hyper-parameter tuning (GridSearchCV) best k and score")
    print({"best_k": tuning["best_k"], "best_score": tuning["best_score"]})



A1: Confusion matrices and metrics (k=5)
Classes (order): ['apple', 'banana', 'blackgram', 'chickpea', 'coconut', 'coffee', 'cotton', 'grapes', 'jute', 'kidneybeans', 'lentil', 'maize', 'mango', 'mothbeans', 'mungbean', 'muskmelon', 'orange', 'papaya', 'pigeonpeas', 'pomegranate', 'rice', 'watermelon']
Train confusion matrix:
 [[80  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0]
 [ 0 80  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0]
 [ 0  0 79  0  0  0  0  0  0  0  1  0  0  0  0  0  0  0  0  0  0  0]
 [ 0  0  0 80  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0]
 [ 0  0  0  0 80  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0]
 [ 0  0  0  0  0 79  0  0  1  0  0  0  0  0  0  0  0  0  0  0  0  0]
 [ 0  0  0  0  0  0 80  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0]
 [ 0  0  0  0  0  0  0 80  0  0  0  0  0  0  0  0  0  0  0  0  0  0]
 [ 0  0  0  0  0  0  0  0 79  0  0  0  0  0  0  0  0  0  0  0  1  0]
 [ 0  0  0  0  0  0  0  0  0 80  0  0  0  0  0  0