# QA Training Data 
### Visualise the clusters of the training data. Maybe exclude outliers
Goal is to reduce confusion. Make training point classes clean/accurate/spectrally separable.

In [None]:
# Reload functions during development
%load_ext autoreload
%autoreload 2

In [None]:
import geopandas as gpd
from ldn.typology import colors

In [None]:
# Get the training data
training_data = gpd.read_file("training_data.geojson")
class_attr = "lulc"

training_data.explore(
    column=class_attr,
    categorical=True,
    categories=(present_classes := sorted(training_data[class_attr].unique())),
    cmap=[colors[c] for c in present_classes],
    legend=True,
    style_kwds={"radius": 6, "fillOpacity": 0.8, "weight": 0.5}
)

In [None]:
training_data.columns

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score
from scipy.spatial.distance import cdist

# ── 1. Prep feature matrix ────────────────────────────────────────────────────
exclude_cols = ['lulc', 'geometry']
feature_cols = [c for c in training_data.columns if c not in exclude_cols]

training_data['outlier'] = False
training_data.loc[training_data[feature_cols].isna().any(axis=1), 'outlier'] = True

nan_rows = training_data['outlier'].sum()
print(f"Rows with NaNs (auto-flagged): {nan_rows}")

valid_mask = ~training_data[feature_cols].isna().any(axis=1)
valid_idx  = training_data.index[valid_mask]

X = training_data.loc[valid_idx, feature_cols].values
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# ── 2. Visualisation helpers ──────────────────────────────────────────────────
CLUSTER_CMAP = 'tab10'

def plot_pca(ax, Xc, labels, centers, outlier_mask, class_name, best_score):
    """PCA scatter — inliers coloured by cluster, outliers as red crosses."""
    pca      = PCA(n_components=2)
    X_2d     = pca.fit_transform(Xc)
    c_2d     = pca.transform(centers)
    var      = pca.explained_variance_ratio_
    k        = len(np.unique(labels))
    inliers  = ~outlier_mask

    ax.scatter(
        X_2d[inliers, 0], X_2d[inliers, 1],
        c=labels[inliers], cmap=CLUSTER_CMAP, vmin=0, vmax=max(k - 1, 1),
        alpha=0.55, s=25, linewidths=0, label='inlier'
    )
    if outlier_mask.any():
        ax.scatter(
            X_2d[outlier_mask, 0], X_2d[outlier_mask, 1],
            c='red', marker='x', s=50, linewidths=1.2, label='outlier'
        )
    ax.scatter(
        c_2d[:, 0], c_2d[:, 1],
        c='black', marker='*', s=180, zorder=5, label='centroid'
    )
    ax.set(
        xlabel=f'PC1 ({var[0]:.1%})',
        ylabel=f'PC2 ({var[1]:.1%})',
        title=f'PCA  |  k={k}  sil={best_score:.3f}'
    )
    ax.legend(fontsize=7, markerscale=0.9)


def plot_heatmap(ax, centers, feature_cols, outlier_count, n):
    """Centroid heatmap — rows = clusters, cols = features."""
    im = ax.imshow(centers, aspect='auto', cmap='RdBu_r')
    ax.set_xticks(range(len(feature_cols)))
    ax.set_xticklabels(feature_cols, rotation=45, ha='right', fontsize=7)
    ax.set_yticks(range(len(centers)))
    ax.set_yticklabels([f'C{i}' for i in range(len(centers))], fontsize=8)
    plt.colorbar(im, ax=ax, label='Scaled value', pad=0.02)

    # Annotate each cell with the scaled value
    for r in range(centers.shape[0]):
        for c in range(centers.shape[1]):
            ax.text(c, r, f'{centers[r, c]:.1f}',
                    ha='center', va='center', fontsize=6,
                    color='white' if abs(centers[r, c]) > 1.2 else 'black')

    ax.set_title(f'Centroids  |  outliers={outlier_count} ({100*outlier_count/n:.1f}%)')


# ── 3. Per-class clustering + outlier flagging + visualisation ────────────────
threshold = 2.0   # ← tune: lower = more aggressive flagging

classes = training_data.loc[valid_idx, 'lulc'].unique()

for cls in classes:
    mask = (training_data.loc[valid_idx, 'lulc'] == cls).values
    idx  = valid_idx[mask]
    Xc   = X_scaled[mask]
    n    = len(Xc)

    if n < 6:
        continue
    k_max = min(5, n // 5)
    if k_max < 2:
        continue

    # ── Optimal k via silhouette ──────────────────────────────────────────
    best_k, best_score = 2, -1
    for k in range(2, k_max + 1):
        km     = KMeans(n_clusters=k, random_state=42, n_init=10)
        labels = km.fit_predict(Xc)
        score  = silhouette_score(Xc, labels)
        if score > best_score:
            best_k, best_score = k, score

    km_final = KMeans(n_clusters=best_k, random_state=42, n_init=10).fit(Xc)
    labels   = km_final.labels_
    centers  = km_final.cluster_centers_

    # ── Outlier flagging ──────────────────────────────────────────────────
    outlier_mask = np.zeros(n, dtype=bool)
    for cluster_id in range(best_k):
        in_cluster = labels == cluster_id
        pts        = Xc[in_cluster]
        dists      = cdist(pts, centers[cluster_id].reshape(1, -1)).flatten()
        median_d   = np.median(dists)
        if median_d == 0:
            continue
        flagged = dists > threshold * median_d
        outlier_mask[np.where(in_cluster)[0][flagged]] = True

    training_data.loc[idx[outlier_mask], 'outlier'] = True

    print(f"  {str(cls):30s} | n={n:4d} | k={best_k} | sil={best_score:.3f} "
          f"| outliers={outlier_mask.sum():4d} ({100*outlier_mask.mean():.1f}%)")

    # ── Figure: PCA (left) + heatmap (right) ─────────────────────────────
    fig = plt.figure(figsize=(14, 4.5))
    fig.suptitle(f'Class: {cls}  (n={n})', fontsize=12, fontweight='bold', y=1.01)

    gs  = gridspec.GridSpec(1, 2, width_ratios=[1, 1.6], figure=fig)
    ax_pca  = fig.add_subplot(gs[0])
    ax_heat = fig.add_subplot(gs[1])

    plot_pca(ax_pca, Xc, labels, centers, outlier_mask, cls, best_score)
    plot_heatmap(ax_heat, centers, feature_cols, outlier_mask.sum(), n)

    plt.tight_layout()
    plt.show()

# ── 4. Summary ────────────────────────────────────────────────────────────────
print(f"\nTotal points : {len(training_data):,}")
print(f"Clean        : {(~training_data['outlier']).sum():,}")
print(f"Outliers     : {training_data['outlier'].sum():,}")
print(f"\nOutliers per class:")
print(training_data.groupby('lulc')['outlier'].mean().mul(100).round(1).astype(str) + '%')

In [None]:
training_clean = training_data[~training_data['outlier']]
training_clean.explore(
    column=class_attr,
    categorical=True,
    categories=(present_classes := sorted(training_clean[class_attr].unique())),
    cmap=[colors[c] for c in present_classes],
    legend=True,
    style_kwds={"radius": 6, "fillOpacity": 0.8, "weight": 0.5}
)

In [None]:
# Write the data with the new outlier column for use training a model and prediction.
training_data.to_file("training_data.geojson", driver="GeoJSON")

Look at how Other class is clustered. Maybe we can split Other to spectrally distinct classes that are merged after prediction. Other is getting about 6% accuracy.

For land cover / remote sensing data, PCA + centroid heatmap together give the most actionable insight — PCA shows separation, the heatmap explains it in terms of your original bands/indices.