In [None]:
# -*- coding: utf-8 -*-
"""Land Cover Classification using Sentinel-2 and Dynamic World Data"""
!pip install rasterio
!pip install xlsxwriter
# Import required libraries
import numpy as np
import matplotlib.pyplot as plt
from google.colab import drive
import rasterio
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier, ExtraTreesClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.preprocessing import StandardScaler
from sklearn.utils.class_weight import compute_class_weight
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
import seaborn as sns
import pandas as pd
from rasterio.enums import Resampling
import os
import shutil
import time
import warnings
warnings.filterwarnings('ignore')

# Constants
CLASS_NAMES = {
    0: 'Water', 1: 'Trees', 2: 'Grass',
    3: 'Flooded vegetation', 4: 'Crops',
    5: 'Shrub and scrub', 6: 'Built area',
    7: 'Bare ground', 8: 'Snow and ice'
}

COLORS = ['#419BDF', '#397D49', '#88B053', '#7A87C6',
          '#E49635', '#DFC35A', '#C4281B', '#A59B8F', '#B39FE1']

FEATURE_NAMES = [
    'B2 (Blue)', 'B3 (Green)', 'B4 (Red)',
    'B5 (Red Edge 1)', 'B6 (Red Edge 2)', 'B7 (Red Edge 3)',
    'B8 (NIR)', 'B8A (Red Edge 4)', 'B11 (SWIR 1)', 'B12 (SWIR 2)',
    'NDVI', 'NDWI', 'SAVI'
]

def calculate_indices(data):
    """Calculate vegetation indices from Sentinel-2 bands."""
    height, width = data.shape[1], data.shape[2]

    nir = data[6].reshape(height, width)  # NIR band
    red = data[2].reshape(height, width)  # Red band
    green = data[1].reshape(height, width)  # Green band

    epsilon = 1e-10

    # Calculate indices
    denom = nir + red + epsilon
    ndvi = (nir - red) / denom

    denom = green + nir + epsilon
    ndwi = (green - nir) / denom

    denom = nir + red + 0.5 + epsilon
    savi = (1.5 * (nir - red)) / denom

    # Stack indices
    indices = np.stack([ndvi, ndwi, savi])
    return indices

def load_and_preprocess_data(s2_path, dw_path):
    """Load and preprocess satellite data."""
    with rasterio.open(s2_path) as src:
        s2_data = src.read()
        height, width = src.height, src.width
        print(f"Sentinel-2 shape: {s2_data.shape}")

        indices = calculate_indices(s2_data)
        print("Added indices: NDVI, NDWI, SAVI")

        s2_data = np.vstack((s2_data, indices))

    with rasterio.open(dw_path) as src:
        dw_data = src.read(
            out_shape=(1, height, width),
            resampling=Resampling.nearest
        )

    return s2_data, dw_data

def prepare_data(X, y):
    """Prepare data for model training."""
    X = X.reshape(X.shape[0], -1).T
    y = y.flatten()

    valid_pixels = ~np.isnan(y)
    X = X[valid_pixels]
    y = y[valid_pixels]

    return X, y

def create_visualization(plt_func):
    """Decorator for consistent plot styling."""
    def wrapper(*args, **kwargs):
        plt.figure(figsize=(12, 8))
        plt_func(*args, **kwargs)
        plt.tight_layout()
        if 'save_path' in kwargs:
            plt.savefig(kwargs['save_path'], dpi=300, bbox_inches='tight')
        plt.close()
    return wrapper

# Visualization functions
@create_visualization
def plot_ground_truth(dw_data, save_path=None):
    """Plot ground truth map."""
    custom_cmap = plt.matplotlib.colors.ListedColormap(COLORS)
    im = plt.imshow(dw_data[0], cmap=custom_cmap, vmin=0, vmax=8)

    cbar = plt.colorbar(im, ticks=range(9))
    cbar.ax.set_yticklabels([CLASS_NAMES[i] for i in range(9)])

    plt.title('Ground Truth Land Cover Classification')
    plt.axis('off')

    plt.text(0.02, 0.05, '20m/pixel', transform=plt.gca().transAxes,
             bbox=dict(facecolor='white', alpha=0.7))
    plt.annotate('N↑', xy=(0.02, 0.95), xycoords='axes fraction',
                fontsize=12, bbox=dict(facecolor='white', alpha=0.7))

@create_visualization
def plot_predictions(pipeline, X_all, dw_data, save_path=None):
    """Plot prediction map."""
    custom_cmap = plt.matplotlib.colors.ListedColormap(COLORS)
    predictions = pipeline.predict(X_all)
    prediction_map = predictions.reshape(dw_data[0].shape)

    im = plt.imshow(prediction_map, cmap=custom_cmap, vmin=0, vmax=8)
    cbar = plt.colorbar(im, ticks=range(9))
    cbar.ax.set_yticklabels([CLASS_NAMES[i] for i in range(9)])

    plt.title('Classification Results')
    plt.axis('off')

    plt.text(0.02, 0.05, '20m/pixel', transform=plt.gca().transAxes,
             bbox=dict(facecolor='white', alpha=0.7))
    plt.annotate('N↑', xy=(0.02, 0.95), xycoords='axes fraction',
                fontsize=12, bbox=dict(facecolor='white', alpha=0.7))

@create_visualization
def plot_confusion_matrix(y_test, y_pred, save_path=None):
    """Plot confusion matrix."""
    cm = confusion_matrix(y_test, y_pred)
    cm_percent = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis] * 100

    sns.heatmap(cm, annot=np.array([[f'{val:,}\n({percent:.1f}%)'
                for val, percent in zip(row, row_percent)]
                for row, row_percent in zip(cm, cm_percent)]),
                fmt='', cmap='YlOrRd',
                xticklabels=[CLASS_NAMES[i] for i in range(9)],
                yticklabels=[CLASS_NAMES[i] for i in range(9)])

    plt.title('Confusion Matrix')
    plt.ylabel('True Label')
    plt.xlabel('Predicted Label')
    plt.xticks(rotation=45, ha='right')

@create_visualization
def plot_feature_importance(model, save_path=None):
    """Plot feature importance."""
    importance_df = pd.DataFrame({
        'feature': FEATURE_NAMES,
        'importance': model.feature_importances_
    }).sort_values('importance', ascending=True)

    plt.barh(range(len(importance_df)), importance_df['importance'], color='#2C7BB6')
    plt.yticks(range(len(importance_df)), importance_df['feature'])
    plt.xlabel('Relative Importance')
    plt.title('Feature Importance in Classification')

    for i, v in enumerate(importance_df['importance']):
        plt.text(v, i, f' {v:.3f}', va='center')

    plt.grid(axis='x', linestyle='--', alpha=0.7)

@create_visualization
def plot_accuracy_by_class(y_test, y_pred, save_path=None):
    """Plot accuracy and sample size by class."""
    metrics = {i: {
        'accuracy': (y_pred[y_test == i] == i).mean(),
        'samples': np.sum(y_test == i)
    } for i in range(9)}

    ax1 = plt.gca()
    classes = list(metrics.keys())
    accuracies = [metrics[c]['accuracy'] for c in classes]
    samples = [metrics[c]['samples'] for c in classes]

    ax1.bar(classes, accuracies, color='#2C7BB6')
    ax1.set_ylim(0, 1)
    ax1.set_ylabel('Classification Accuracy')

    ax2 = ax1.twinx()
    ax2.plot(classes, samples, 'r-o')
    ax2.set_ylabel('Number of Samples')

    plt.title('Accuracy and Sample Size by Class')
    ax1.set_xticks(classes)
    ax1.set_xticklabels([CLASS_NAMES[i] for i in classes], rotation=45, ha='right')

    for i, (acc, n) in enumerate(zip(accuracies, samples)):
        ax1.text(i, acc, f'{acc:.1%}', ha='center', va='bottom')
        ax2.text(i, n, f'{n:,}', ha='center', va='top', color='red')

    ax1.legend(['Accuracy'], loc='upper left')
    ax2.legend(['Sample size'], loc='upper right')
    plt.grid(True, alpha=0.3)

@create_visualization
def plot_classifier_comparison(results, save_path=None):
    """Plot perbandingan performa classifier."""
    # Extract metrics
    metrics = {}
    for name, result in results.items():
        report = classification_report(
            result['y_test'],
            result['y_pred'],
            output_dict=True,
            zero_division=0
        )
        metrics[name] = {
            'Accuracy': report['accuracy'],
            'Macro F1': report['macro avg']['f1-score'],
            'Weighted F1': report['weighted avg']['f1-score'],
            'Training Time': result['training_time']
        }

    # Convert to DataFrame
    df = pd.DataFrame(metrics).T

    # Create subplots
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))

    # Performance metrics
    df[['Accuracy', 'Macro F1', 'Weighted F1']].plot(
        kind='bar', ax=ax1, width=0.8
    )
    ax1.set_title('Performance Metrics by Classifier')
    ax1.set_xlabel('Classifier')
    ax1.set_ylabel('Score')
    ax1.grid(True, alpha=0.3)
    ax1.legend(title='Metric')

    # Training time
    df['Training Time'].plot(kind='bar', ax=ax2, color='green', width=0.6)
    ax2.set_title('Training Time by Classifier')
    ax2.set_xlabel('Classifier')
    ax2.set_ylabel('Time (seconds)')
    ax2.grid(True, alpha=0.3)

    # Rotate x-labels
    plt.setp(ax1.xaxis.get_majorticklabels(), rotation=45, ha='right')
    plt.setp(ax2.xaxis.get_majorticklabels(), rotation=45, ha='right')

    # Add value labels
    for ax in [ax1, ax2]:
        for container in ax.containers:
            ax.bar_label(container, fmt='%.3f', padding=3)

    plt.tight_layout()

def clean_plots_folder():
    """Hapus dan buat ulang folder plots."""
    if os.path.exists('plots'):
        shutil.rmtree('plots')
    os.makedirs('plots')
    print("Folder 'plots' telah dibersihkan dan dibuat ulang")

def train_multiple_classifiers(X, y, test_size=0.9):
    """Train dan evaluasi beberapa classifier."""
    # Split data
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=test_size, random_state=42, stratify=y
    )

    # Define preprocessing pipelines
    rf_pipeline = Pipeline([
        ('imputer', SimpleImputer(strategy='constant', fill_value=0)),
        ('scaler', StandardScaler()),
        ('classifier', RandomForestClassifier(
            n_estimators=100, max_depth=20,
            class_weight='balanced', n_jobs=-1, random_state=42))
    ])

    et_pipeline = Pipeline([
        ('imputer', SimpleImputer(strategy='constant', fill_value=0)),
        ('scaler', StandardScaler()),
        ('classifier', ExtraTreesClassifier(
            n_estimators=100, max_depth=20,
            class_weight='balanced', n_jobs=-1, random_state=42))
    ])

    lr_pipeline = Pipeline([
        ('imputer', SimpleImputer(strategy='constant', fill_value=0)),
        ('scaler', StandardScaler()),
        ('classifier', LogisticRegression(
            multi_class='multinomial', max_iter=300,
            class_weight='balanced', n_jobs=-1, random_state=42))
    ])

    # Define classifiers with pipelines
    classifiers = {
        'Random Forest': rf_pipeline,
        'Extra Trees': et_pipeline,
        'Logistic Regression': lr_pipeline
    }

    results = {}
    for name, pipeline in classifiers.items():
        print(f"\nTraining {name}...")
        start_time = time.time()

        # Train classifier
        pipeline.fit(X_train, y_train)

        # Make predictions
        y_pred = pipeline.predict(X_test)

        # Calculate training time
        training_time = time.time() - start_time

        # Store results
        results[name] = {
            'classifier': pipeline,
            'y_test': y_test,
            'y_pred': y_pred,
            'training_time': training_time,
            'report': classification_report(y_test, y_pred, zero_division=0)
        }

        print(f"{name} Training Time: {training_time:.2f} seconds")
        print(f"\nClassification Report for {name}:")
        print(results[name]['report'])

    return results

# Main execution
print("Mounting Google Drive...")
drive.mount('/content/drive', force_remount=True)

s2_path = '/content/drive/My Drive/GEE_Exports/sentinel2_jambi_2023_allbands.tif'
dw_path = '/content/drive/My Drive/GEE_Exports/dynamicworld_jambi_2023.tif'

# Load and process data
print("\nLoading and preprocessing data...")
s2_data, dw_data = load_and_preprocess_data(s2_path, dw_path)
X, y = prepare_data(s2_data, dw_data)
print(f"Final data shape: {X.shape}")

# Clean plots folder
print("\nCleaning plots folder...")
clean_plots_folder()

# Train multiple classifiers
print("\nTraining classifiers...")
classifier_results = train_multiple_classifiers(X, y, test_size=0.99)

# Generate comparison plot
print("\nGenerating comparison plots...")
plot_classifier_comparison(classifier_results, save_path='plots/classifier_comparison.png')

# Prepare data for full prediction
X_all = s2_data.reshape(s2_data.shape[0], -1).T

# Generate individual plots for each classifier
for name, result in classifier_results.items():
    print(f"\nGenerating plots for {name}...")

    # Create subfolder for each classifier
    classifier_folder = f'plots/{name.lower().replace(" ", "_")}'
    os.makedirs(classifier_folder, exist_ok=True)

    # Generate plots
    plots = [
        (plot_ground_truth, (dw_data,), 'ground_truth.png'),
        (plot_predictions, (result['classifier'], X_all, dw_data), 'predictions.png'),
        (plot_confusion_matrix, (result['y_test'], result['y_pred']), 'confusion_matrix.png'),
        (plot_accuracy_by_class, (result['y_test'], result['y_pred']), 'accuracy_by_class.png')
    ]

    # Add feature importance plot for supported classifiers
    if hasattr(result['classifier'].named_steps['classifier'], 'feature_importances_'):
        plots.append(
            (plot_feature_importance,
             (result['classifier'].named_steps['classifier'],),
             'feature_importance.png')
        )

    for plot_func, args, filename in plots:
        plot_func(*args, save_path=f'{classifier_folder}/{filename}')

print("\nAll processing complete! Results saved in 'plots' directory")

Mounting Google Drive...
Mounted at /content/drive

Loading and preprocessing data...
Sentinel-2 shape: (10, 2366, 1715)
Added indices: NDVI, NDWI, SAVI
Final data shape: (4057690, 13)

Cleaning plots folder...
Folder 'plots' telah dibersihkan dan dibuat ulang

Training classifiers...

Training Random Forest...
Random Forest Training Time: 72.70 seconds

Classification Report for Random Forest:
              precision    recall  f1-score   support

           0       0.78      0.59      0.67    157991
           1       0.91      0.95      0.93   3028043
           2       0.30      0.03      0.05     49520
           3       0.24      0.09      0.13     14570
           4       0.37      0.20      0.26    101705
           5       0.40      0.50      0.44    157845
           6       0.79      0.75      0.77    476197
           7       0.60      0.52      0.56     30737
           8       0.20      0.00      0.01       506

    accuracy                           0.86   4017114
   mac

<Figure size 1200x800 with 0 Axes>

In [6]:
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, classification_report, confusion_matrix


def shorten_name(name, max_length=31):
    """Shorten name to be Excel worksheet compatible."""
    if len(name) <= max_length:
        return name

    # Create shorter versions of common words
    replacements = {
        'Random Forest': 'RF',
        'Extra Trees': 'ET',
        'Logistic Regression': 'LR',
        'Performance': 'Perf',
        'Statistics': 'Stats',
        'Analysis': 'Anal',
        'Distribution': 'Dist',
        'Classification': 'Class',
        'Confusion Matrix': 'CM',
        'Parameters': 'Params'
    }

    result = name
    for old, new in replacements.items():
        result = result.replace(old, new)

    if len(result) > max_length:
        result = result[:max_length]

    return result

def export_analysis_to_excel(results, s2_data, dw_data, save_path='classification_analysis.xlsx'):
    """Export detailed analysis results to Excel with accurate percentage calculations."""

    with pd.ExcelWriter(save_path, engine='xlsxwriter') as writer:
        workbook = writer.book
        decimal_format = workbook.add_format({'num_format': '0.00'})
        # Format untuk angka bulat dengan pemisah ribuan
        thousand_format = workbook.add_format({'num_format': '#.##0;[Red]-#.##0'})

        # 1. Overall Performance
        performance_metrics = {}
        for name, result in results.items():
            report = classification_report(
                result['y_test'],
                result['y_pred'],
                output_dict=True,
                zero_division=0
            )
            performance_metrics[name] = {
                'Overall Accuracy': round(float(report['accuracy']) * 100, 2),
                'Macro F1-Score': round(float(report['macro avg']['f1-score']) * 100, 2),
                'Weighted F1-Score': round(float(report['weighted avg']['f1-score']) * 100, 2),
                'Training Time (seconds)': round(result['training_time'], 2),
                'Number of Samples': len(result['y_test'])
            }

        perf_df = pd.DataFrame(performance_metrics).T
        perf_df.to_excel(writer, sheet_name='Overall_Perf')

        worksheet = writer.sheets['Overall_Perf']
        for col in ['Overall Accuracy', 'Macro F1-Score', 'Weighted F1-Score']:
            col_idx = perf_df.columns.get_loc(col) + 1
            worksheet.set_column(col_idx, col_idx, None, decimal_format)

        # 2. Per-Class Performance
        for name, result in results.items():
            class_metrics = {}
            for i in range(9):
                mask = result['y_test'] == i

                # Multiply by 100 first, then round
                accuracy = np.mean(result['y_test'][mask] == result['y_pred'][mask]) * 100
                prec = precision_score(result['y_test'], result['y_pred'],
                                    labels=[i], average='macro', zero_division=0) * 100
                rec = recall_score(result['y_test'], result['y_pred'],
                                 labels=[i], average='macro', zero_division=0) * 100
                f1 = f1_score(result['y_test'], result['y_pred'],
                            labels=[i], average='macro', zero_division=0) * 100

                class_metrics[CLASS_NAMES[i]] = {
                    'Total Samples': np.sum(mask),
                    'Correct Predictions': np.sum(result['y_test'][mask] == result['y_pred'][mask]),
                    'Accuracy': round(accuracy, 2),
                    'Precision': round(prec, 2),
                    'Recall': round(rec, 2),
                    'F1-Score': round(f1, 2)
                }

            sheet_name = shorten_name(f'{name}_Perf')
            class_df = pd.DataFrame(class_metrics).T
            class_df.to_excel(writer, sheet_name=sheet_name)

            worksheet = writer.sheets[sheet_name]
            # Format persentase
            for col in ['Accuracy', 'Precision', 'Recall', 'F1-Score']:
                col_idx = class_df.columns.get_loc(col) + 1
                worksheet.set_column(col_idx, col_idx, None, decimal_format)

            # Format angka ribuan
            for col in ['Total Samples', 'Correct Predictions']:
                col_idx = class_df.columns.get_loc(col) + 1
                worksheet.set_column(col_idx, col_idx, None, thousand_format)

        # 4. Class Distribution
        class_counts = []
        valid_counts = []

        for i in range(9):
            # Menghitung total piksel untuk setiap kelas menggunakan count_nonzero
            total_count = int(np.count_nonzero(dw_data[0] == i))
            # Menghitung piksel valid menggunakan count_nonzero
            valid_mask = (~np.isnan(dw_data[0])) & (dw_data[0] == i)
            valid_count = int(np.count_nonzero(valid_mask))
            print(f"Class {i}: {CLASS_NAMES[i]} - Total: {total_count}, Valid: {valid_count}")

            class_counts.append(total_count)
            valid_counts.append(valid_count)

        class_distribution = pd.DataFrame({
            'Class': [CLASS_NAMES[i] for i in range(9)],
            'Total Pixels': class_counts,
            'Valid Pixels': valid_counts
        })

        # Menghitung persentase
        total_valid_pixels = np.sum(valid_counts)
        class_distribution['Percentage'] = [(count / total_valid_pixels * 100)
                                          for count in valid_counts]
        class_distribution['Percentage'] = class_distribution['Percentage'].round(2)

        class_distribution.to_excel(writer, sheet_name='Class_Dist')

        worksheet = writer.sheets['Class_Dist']
        # Format persentase
        percentage_col = class_distribution.columns.get_loc('Percentage') + 1
        worksheet.set_column(percentage_col, percentage_col, None, decimal_format)

        # Format angka ribuan
        for col in ['Total Pixels', 'Valid Pixels']:
            col_idx = class_distribution.columns.get_loc(col) + 1
            worksheet.set_column(col_idx, col_idx, None, thousand_format)

        # 5. Confusion Matrix Analysis
        for name, result in results.items():
            cm = confusion_matrix(result['y_test'], result['y_pred'])
            cm_df = pd.DataFrame(cm,
                               index=[f'True_{CLASS_NAMES[i]}' for i in range(9)],
                               columns=[f'Pred_{CLASS_NAMES[i]}' for i in range(9)])

            total_samples = len(result['y_test'])
            cm_stats = {}

            for i in range(9):
                # Multiply first, then round
                proportion = (np.sum(cm[i, :]) / total_samples) * 100
                pred_rate = (np.sum(cm[:, i]) / total_samples) * 100

                cm_stats[CLASS_NAMES[i]] = {
                    'True Positives': cm[i, i],
                    'False Positives': np.sum(cm[:, i]) - cm[i, i],
                    'False Negatives': np.sum(cm[i, :]) - cm[i, i],
                    'Proportion': round(proportion, 2),
                    'Prediction Rate': round(pred_rate, 2)
                }

            sheet_name = shorten_name(f'{name}_CM')
            pd.DataFrame(cm_df).to_excel(writer, sheet_name=sheet_name)

            stats_sheet = shorten_name(f'{name}_Stats')
            stats_df = pd.DataFrame(cm_stats).T
            stats_df.to_excel(writer, sheet_name=stats_sheet)

            worksheet = writer.sheets[stats_sheet]
            # Format persentase
            for col in ['Proportion', 'Prediction Rate']:
                col_idx = stats_df.columns.get_loc(col) + 1
                worksheet.set_column(col_idx, col_idx, None, decimal_format)

            # Format angka ribuan
            for col in ['True Positives', 'False Positives', 'False Negatives']:
                col_idx = stats_df.columns.get_loc(col) + 1
                worksheet.set_column(col_idx, col_idx, None, thousand_format)

        # 8. Vegetation Indices Analysis
        vi_stats = {}
        for i, vi_name in enumerate(['NDVI', 'NDWI', 'SAVI']):
            vi_data = s2_data[-(3-i)].flatten()
            valid_data = vi_data[~np.isnan(vi_data)]

            for class_id in range(9):
                class_mask = dw_data[0].flatten() == class_id
                class_data = vi_data[class_mask]
                valid_class_data = class_data[~np.isnan(class_data)]

                if len(valid_class_data) > 0:
                    vi_stats[f'{vi_name}_{CLASS_NAMES[class_id]}'] = {
                        'Mean': round(np.mean(valid_class_data), 2),
                        'Median': round(np.median(valid_class_data), 2),
                        'Std': round(np.std(valid_class_data), 2),
                        'Min': round(np.min(valid_class_data), 2),
                        'Max': round(np.max(valid_class_data), 2)
                    }

        pd.DataFrame(vi_stats).T.to_excel(writer, sheet_name='VI_Stats')

    print(f"\nDetailed analysis exported to {save_path}")
    return save_path

# Setelah training
print("\nExporting analysis results...")

export_analysis_to_excel(
    classifier_results, s2_data, dw_data,
    save_path='plots/landcover_analysis1.xlsx'
)


Exporting analysis results...
Class 0: Water - Total: 159587, Valid: 159587
Class 1: Trees - Total: 3058629, Valid: 3058629
Class 2: Grass - Total: 50020, Valid: 50020
Class 3: Flooded vegetation - Total: 14717, Valid: 14717
Class 4: Crops - Total: 102732, Valid: 102732
Class 5: Shrub and scrub - Total: 159439, Valid: 159439
Class 6: Built area - Total: 481007, Valid: 481007
Class 7: Bare ground - Total: 31048, Valid: 31048
Class 8: Snow and ice - Total: 511, Valid: 511

Detailed analysis exported to plots/landcover_analysis1.xlsx


'plots/landcover_analysis1.xlsx'