# EuroSAT_MS – Colab + Google Drive Version (Conventional ML Baseline)

This notebook is an updated solution that:
- Mounts **Google Drive** and extracts your **EuroSAT_MS .zip** directly in Colab.
- Covers the **visual tasks** (RGB view, NDVI, thresholded segmentation mask, per-band inspection).
- For the **first modeling task**, uses **conventional machine learning** (feature engineering + scikit-learn classifiers) — **not** a CNN.
- Compares simple models (Logistic Regression, SVM, RandomForest) on a validation split.


## 1) Mount Google Drive and extract the dataset


In [0]:
from google.colab import drive
from pathlib import Path
import os, glob, subprocess

drive.mount('/content/drive')

# 🔧 Set this to your zip path in Drive if auto-discovery fails.
ZIP_PATH = None  # e.g. "/content/drive/MyDrive/EuroSAT_MS.zip"

if ZIP_PATH is None:
    # Try to auto-discover a EuroSAT-related zip in MyDrive
    candidates = glob.glob("/content/drive/MyDrive/**/*.zip", recursive=True)
    euro = [p for p in candidates if "eurosat" in p.lower() or "euro_sat" in p.lower() or "sentinel" in p.lower()]
    if euro:
        ZIP_PATH = euro[0]
        print("Auto-detected zip:", ZIP_PATH)
    else:
        print("⚠️ Could not auto-detect. Please set ZIP_PATH to your zip in Drive.")

DEST = Path("/content/EuroSAT_MS")
DEST.mkdir(parents=True, exist_ok=True)

if ZIP_PATH and Path(ZIP_PATH).exists():
    print("Extracting... this may take a minute.")
    # use unzip (fast) and overwrite (-o) to be idempotent
    !unzip -q -o "$ZIP_PATH" -d "/content"
    # Often the zip contains a root folder; try to detect the proper data root
    # We expect 10 class folders at the top-level of EuroSAT_MS path
    # Try common names
    import os
    common_dirs = [
        "/content/EuroSAT_MS", 
        "/content/EuroSAT", 
        "/content/2750", 
        "/content/dataset", 
        "/content/EuroSATallBands", 
        "/content/EuroSAT_MS/EuroSAT_MS"
    ]
    data_root = None
    for d in common_dirs:
        if Path(d).exists():
            # Heuristic: contains many class folders with .tif files
            subdirs = [p for p in Path(d).iterdir() if p.is_dir()]
            if len(subdirs) >= 8:  # expect ~10
                data_root = Path(d)
                break
    if data_root is None:
        # fallback: find a directory under /content with >=8 subfolders
        for p in Path("/content").iterdir():
            if p.is_dir():
                subs = [q for q in p.iterdir() if q.is_dir()]
                if len(subs) >= 8:
                    data_root = p
                    break
    if data_root is None:
        data_root = DEST
    print("Using data root:", data_root)
else:
    data_root = DEST
    print("⚠️ ZIP_PATH not set or file missing. Ensure data is available at:", data_root)

DATA_ROOT = Path(data_root)


## 2) Configuration & Utilities


In [0]:
import os, sys, random, math, json, glob
import numpy as np

RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)
random.seed(RANDOM_SEED)

print("Data root:", DATA_ROOT.resolve())


## 3) Load GeoTIFFs and Visualize (RGB, NDVI, Bands)


In [0]:
import numpy as np
import rasterio
from pathlib import Path

# Expected Sentinel-2 band order (EuroSAT-MS): indices 0..12 for 13 bands
# NDVI: NIR=B8 (idx 7), RED=B4 (idx 3)

CLASS_NAMES = []
if DATA_ROOT.exists():
    for d in sorted([p for p in DATA_ROOT.iterdir() if p.is_dir()]):
        CLASS_NAMES.append(d.name)
CLASS_TO_IDX = {c:i for i,c in enumerate(CLASS_NAMES)}
print("Detected classes:", CLASS_NAMES)

def read_multiband_tif(path):
    with rasterio.open(path) as src:
        arr = src.read()  # (C,H,W)
        arr = arr.astype(np.float32)
        # robust per-band normalization to [0,1]
        C,H,W = arr.shape
        for i in range(C):
            band = arr[i]
            lo, hi = np.percentile(band, [2, 98])
            if hi > lo:
                band = (band - lo) / (hi - lo)
            else:
                bmin, bmax = band.min(), band.max()
                band = (band - bmin) / (bmax - bmin + 1e-6)
            arr[i] = np.clip(band, 0.0, 1.0)
        return arr

# Collect samples
samples = []
for cls in CLASS_NAMES:
    for tif in (DATA_ROOT/cls).glob("*.tif"):
        samples.append((tif, CLASS_TO_IDX[cls]))
print(f"Found {len(samples)} images.")

# Quick peek (1 sample)
import matplotlib.pyplot as plt

def ndvi_from_arr(arr):
    nir = arr[7]  # B8
    red = arr[3]  # B4
    return (nir - red) / (nir + red + 1e-6)

def show_sample(path, title="Sample"):
    arr = read_multiband_tif(path)
    rgb = np.stack([arr[3], arr[2], arr[1]], axis=-1)  # B4,B3,B2
    ndvi = ndvi_from_arr(arr)

    plt.figure(figsize=(8,4))
    plt.subplot(1,2,1); plt.imshow(np.clip(rgb,0,1)); plt.title(f"{title} – RGB"); plt.axis("off")
    plt.subplot(1,2,2); plt.imshow(ndvi, cmap="RdYlGn"); plt.title("NDVI"); plt.axis("off")
    plt.show()

if samples:
    show_sample(samples[0][0], title=CLASS_NAMES[samples[0][1]])


### NDVI thresholding (vegetation mask)


In [0]:
import numpy as np
import matplotlib.pyplot as plt

def threshold_ndvi(ndvi, thr=0.3):
    return (ndvi > thr).astype(np.uint8)

if samples:
    arr = read_multiband_tif(samples[0][0])
    ndvi = ndvi_from_arr(arr)
    mask = threshold_ndvi(ndvi, thr=0.3)

    plt.figure(figsize=(12,4))
    plt.subplot(1,3,1); plt.imshow(np.stack([arr[3],arr[2],arr[1]], axis=-1)); plt.title("RGB"); plt.axis("off")
    plt.subplot(1,3,2); plt.imshow(ndvi, cmap="RdYlGn"); plt.title("NDVI"); plt.axis("off")
    plt.subplot(1,3,3); plt.imshow(mask, interpolation="nearest"); plt.title("NDVI > 0.3"); plt.axis("off")
    plt.show()


### Per-band inspection


In [0]:
def show_bands(arr):
    import math
    cols = 4
    rows = math.ceil(arr.shape[0]/cols)
    plt.figure(figsize=(12, 3*rows))
    for i in range(arr.shape[0]):
        plt.subplot(rows, cols, i+1)
        plt.imshow(arr[i])
        plt.title(f"Band {i}")
        plt.axis("off")
    plt.tight_layout()
    plt.show()

if samples:
    arr = read_multiband_tif(samples[0][0])
    show_bands(arr)


## 4) Feature Engineering for Conventional ML


In [0]:
import numpy as np

def image_features(arr):
    """Return a 1D feature vector using simple, fast statistics per band + NDVI."""
    feats = []
    # per-band stats
    for i in range(arr.shape[0]):
        band = arr[i]
        p25, p50, p75 = np.percentile(band, [25,50,75])
        feats.extend([band.mean(), band.std(), p25, p50, p75])
    # NDVI stats
    ndvi = ndvi_from_arr(arr)
    p25, p50, p75 = np.percentile(ndvi, [25,50,75])
    veg_ratio = (ndvi > 0.3).mean()
    feats.extend([ndvi.mean(), ndvi.std(), p25, p50, p75, veg_ratio])
    return np.array(feats, dtype=np.float32)

# Optional: compact spatial features via downsampling then PCA later
def downsampled_flat(arr, target=16):
    # simple spatial downsample by stride to roughly target x target per band
    C,H,W = arr.shape
    sx = max(1, H//target)
    sy = max(1, W//target)
    small = arr[:, ::sx, ::sy]
    return small.reshape(-1)  # C * h * w

# Build dataset matrices
X_stats, y, paths = [], [], []
X_flat = []
for p, lab in samples:
    a = read_multiband_tif(p)
    X_stats.append(image_features(a))
    X_flat.append(downsampled_flat(a, target=16))
    y.append(lab); paths.append(str(p))

X_stats = np.stack(X_stats) if X_stats else np.empty((0,))
X_flat  = np.stack(X_flat)  if X_flat  else np.empty((0,))
y = np.array(y)
print("Feature shapes -> stats:", X_stats.shape, "| flat (for PCA):", X_flat.shape, "| labels:", y.shape)


## 5) Train/Validation/Test Split


In [0]:
from sklearn.model_selection import train_test_split

Xtr_stats, Xte_stats, ytr, yte, paths_tr, paths_te = train_test_split(
    X_stats, y, paths, test_size=0.2, random_state=RANDOM_SEED, stratify=y
)
Xtr_stats, Xval_stats, ytr, yval, _, _ = train_test_split(
    Xtr_stats, ytr, paths_tr, test_size=0.2, random_state=RANDOM_SEED, stratify=ytr
)

Xtr_flat, Xte_flat, _, _ = train_test_split(
    X_flat, y, test_size=0.2, random_state=RANDOM_SEED, stratify=y
)
Xtr_flat, Xval_flat, _, _ = train_test_split(
    Xtr_flat, ytr, test_size=0.2, random_state=RANDOM_SEED, stratify=ytr
)

print("Stats features -> train/val/test:", Xtr_stats.shape, Xval_stats.shape, Xte_stats.shape)
print("Flat features  -> train/val/test:", Xtr_flat.shape, Xval_flat.shape, Xte_flat.shape)


## 6) Conventional ML Baselines (scikit-learn)


In [0]:
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, accuracy_score

# Model A: Stats features -> Standardize -> Classifier
pipelines_stats = {
    "LogReg": Pipeline([("scaler", StandardScaler()), ("clf", LogisticRegression(max_iter=2000, n_jobs=None, class_weight="balanced"))]),
    "SVM-RBF": Pipeline([("scaler", StandardScaler()), ("clf", SVC(kernel="rbf", C=10, gamma="scale", class_weight="balanced"))]),
    "RandomForest": Pipeline([("clf", RandomForestClassifier(n_estimators=300, max_depth=None, n_jobs=-1, class_weight="balanced_subsample"))]),
}

# Model B: Flattened downsampled -> Standardize -> PCA -> Classifier
pipelines_flat = {
    "PCA+LogReg": Pipeline([("scaler", StandardScaler(with_mean=True)), ("pca", PCA(n_components=128, random_state=RANDOM_SEED)), ("clf", LogisticRegression(max_iter=2000, class_weight="balanced"))]),
    "PCA+SVM": Pipeline([("scaler", StandardScaler(with_mean=True)), ("pca", PCA(n_components=128, random_state=RANDOM_SEED)), ("clf", SVC(kernel="rbf", C=10, gamma="scale", class_weight="balanced"))]),
}

def eval_pipeline(name, pipe, Xtr, ytr, Xval, yval):
    pipe.fit(Xtr, ytr)
    pred = pipe.predict(Xval)
    acc = accuracy_score(yval, pred)
    print(f"[{name}] val acc: {acc:.3f}")
    return acc, pipe

best = (0.0, None, None)
print("=== Using Stats Features ===")
for name, pipe in pipelines_stats.items():
    acc, fitted = eval_pipeline(name, pipe, Xtr_stats, ytr, Xval_stats, yval)
    if acc > best[0]:
        best = (acc, f"stats::{name}", fitted)

print("\n=== Using Downsampled + PCA Features ===")
for name, pipe in pipelines_flat.items():
    acc, fitted = eval_pipeline(name, pipe, Xtr_flat, ytr, Xval_flat, yval)
    if acc > best[0]:
        best = (acc, f"pca::{name}", fitted)

print("\nBest model on validation:", best[1], "with acc", round(best[0],3))
best_model = best[2]


## 7) Test Evaluation


In [0]:
from sklearn.metrics import confusion_matrix
import numpy as np
import matplotlib.pyplot as plt

# Choose the right test features for the winning pipeline
if best_model is None:
    raise RuntimeError("No model trained. Check earlier cells.")

use_stats = best[1].startswith("stats::")
Xte = Xte_stats if use_stats else Xte_flat

yhat = best_model.predict(Xte)
print(classification_report(yte, yhat, target_names=CLASS_NAMES))

cm = confusion_matrix(yte, yhat)
fig, ax = plt.subplots(figsize=(7,7))
im = ax.imshow(cm, interpolation="nearest")
ax.figure.colorbar(im, ax=ax)
ax.set(xticks=np.arange(len(CLASS_NAMES)), yticks=np.arange(len(CLASS_NAMES)),
       xticklabels=CLASS_NAMES, yticklabels=CLASS_NAMES,
       ylabel="True label", xlabel="Predicted label")
plt.setp(ax.get_xticklabels(), rotation=45, ha="right")
for i in range(cm.shape[0]):
    for j in range(cm.shape[1]):
        ax.text(j, i, format(cm[i,j], "d"), ha="center", va="center")
plt.tight_layout()
plt.show()


## 8) Notes on Distribution Shift (still relevant)

- Ensure your splits avoid **tile leakage** (images from the same large scene should not be in both train and test).
- Normalize per band using **training-set** statistics only.
- Use augmentations that simulate realistic remote-sensing variability.
- Consider domain adaptation or self-supervised pretraining if the target domain differs significantly.


## 9) (Optional) Deep Baseline

If later you also want a CNN baseline, you can add a small 13-channel CNN after the conventional ML section. For **Task 1**, the above **conventional ML** pipelines are sufficient.
