In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import os

pd.set_option('display.max_columns', None)
pd.set_option('display.max_colwidth', None)

Load and prepare the dataset (csv and images)

In [None]:
train_dataset = pd.read_csv('TrainDataset.csv')
train_dataset.head()

In [None]:
# Extract the filename from the full path
train_dataset['tif_name'] = train_dataset['tifPath'].str.split('/').str[-1]
# Drop rows that contain missing values
train_dataset.dropna(inplace=True)
# Sort the DataFrame based on the filename so that the dataset is ordered by the image filenames
train_dataset.sort_values(by='tif_name', inplace=True)

In [None]:
# Define the directory containing the training images >>>REPLACE WITH YOUR OWN
image_dir ='train'

# Get a sorted list of all filenames in the image directory
files = sorted(os.listdir(image_dir))
# Create full file paths by joining the directory path with each filename
file_paths = [os.path.join(image_dir, f) for f in files]

In [None]:
# After sorting the DataFrame based on filename and obtaining sorted list of all filenames in the image directory,
# the order of images and the corresponding labels in the DataFrame should now align.

# Confirm that the number of image paths matches the number of rows in the DataFrame.
print("Number of image files:", len(file_paths))
print("Number of DataFrame entries:", train_dataset.shape[0])

In [None]:
# Assign the full image file paths to the column 'tifPath' in the dataframe
train_dataset['tifPath'] = file_paths
# Verify the column has been updated.
train_dataset.head()

In [None]:
# Adjust the labels to be zero-indexed as expected by most ML models.
train_dataset['label'] = train_dataset['class']-1

In [None]:
from sklearn.model_selection import train_test_split

# Split the dataset into training and validation sets.
# Stratified sampling ensures that the class distribution is maintained across both sets.
train_df, val_df = train_test_split(
    train_dataset,
    test_size=0.2,
    stratify=train_dataset['Target'],
    random_state=2
)

In [None]:
import tensorflow as tf
from tensorflow import keras
from keras import layers, models
from keras.optimizers import SGD, Adam
from imagecodecs import imread

In [None]:
from keras.utils import Sequence, to_categorical

class SatelliteImageGenerator(Sequence):
    """
    Custom data generator for satellite image classification using Keras Sequence.

    Loads multi-band TIFF images from file paths listed in a DataFrame,
    normalizes them, resizes them, and returns batches with one-hot encoded labels.
    """
    def __init__(self, df, batch_size=32, image_size=(224, 224), shuffle=True, num_classes=3):
        self.df = df.reset_index(drop=True) # Reset DataFrame index
        self.batch_size = batch_size # Number of samples per batch
        self.image_size = image_size # Target image size (H, W)
        self.shuffle = shuffle # Whether to shuffle data after each epoch
        self.indices = np.arange(len(df)) # Index tracker
        self.num_classes = num_classes # Number of target classes
        self.on_epoch_end() # Shuffle if needed

    def __len__(self):
        # Returns total number of batches per epoch
        return int(np.ceil(len(self.df) / self.batch_size))

    def __getitem__(self, index):
        """Generates one batch of data"""
        # Get indices for the batch
        batch_indices = self.indices[index * self.batch_size:(index + 1) * self.batch_size]
        # Get file paths and labels for those indices
        batch_paths = self.df.loc[batch_indices, 'tifPath'].values
        batch_labels = self.df.loc[batch_indices, 'label'].values

        # Load and preprocess images
        batch_images = []
        for path in batch_paths:
            img = imread(path).astype('float32') # Read image as float32
            img = tf.image.resize(img, self.image_size) # Resize to target size
            img = img / 15000.0  # Normalize based on sensor range. Range is (0-15000). Normalize to (0-1)
            batch_images.append(img)

        # Stack images into a tensor and one-hot encode labels
        batch_images = tf.stack(batch_images)
        batch_labels = to_categorical(batch_labels, num_classes=self.num_classes)

        return batch_images, batch_labels

    def on_epoch_end(self):
      # Shuffle indices after each epoch
        if self.shuffle:
            np.random.shuffle(self.indices)

In [None]:
# Create training and validation data generators with shuffling disabled during validation for consistent evaluation
train_gen = SatelliteImageGenerator(train_df, batch_size=32, image_size=(224, 224))
val_gen = SatelliteImageGenerator(val_df, batch_size=32, image_size=(224, 224), shuffle=False)

In [None]:
def CNN_builder(input_shape=(224, 224, 12), num_classes=3):
    """
    Builds a VGG16-inspired Convolutional Neural Network model adapted for multi-spectral input.

    Returns:
        keras.Model: Compiled Keras model.
    """
    inputs = keras.Input(shape=input_shape)

    # Each block has:
    # Conv layers with 'n' number of 3x3 filters
    # each followed by BatchNorm, ReLU
    # and ends with a MaxPooling layer.

    # Block 1 >>64 filters, 2 layers
    x = layers.Conv2D(64, (3, 3), padding='same')(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.Conv2D(64, (3, 3), padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.MaxPooling2D((2, 2), strides=(2, 2))(x)

    # Block 2 >>128 filters, 2 layers
    x = layers.Conv2D(128, (3, 3), padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.Conv2D(128, (3, 3), padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.MaxPooling2D((2, 2), strides=(2, 2))(x)

    # Block 3 >>256 filters, 3 layers
    x = layers.Conv2D(256, (3, 3), padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.Conv2D(256, (3, 3), padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.Conv2D(256, (3, 3), padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.MaxPooling2D((2, 2), strides=(2, 2))(x)

    # Block 4 >>512 filters, 3 layers
    x = layers.Conv2D(512, (3, 3), padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.Conv2D(512, (3, 3), padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.Conv2D(512, (3, 3), padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.MaxPooling2D((2, 2), strides=(2, 2))(x)

    # Block 5 >>512 filters, 3 layers
    x = layers.Conv2D(512, (3, 3), padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.Conv2D(512, (3, 3), padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.Conv2D(512, (3, 3), padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.MaxPooling2D((2, 2), strides=(2, 2))(x)

    # Fully connected
    # Two dense layers with 4096 units with BatchNorm and ReLU
    x = layers.Flatten()(x)
    x = layers.Dense(4096)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.Dense(4096)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)

    # Final classification layer
    outputs = layers.Dense(num_classes, activation='softmax')(x)

    model = keras.Model(inputs=inputs, outputs=outputs, name="model1")
    return model

In [None]:
# Build the CNN model
model = CNN_builder()

# Initialize the optimizer (Stochastic Gradient Descent with a learning rate)
optimizer = SGD(learning_rate=0.001)

# Compile the model with categorical crossentropy loss and track accuracy during training
model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])
model.summary()

In [None]:
from keras.callbacks import  ModelCheckpoint, ReduceLROnPlateau

# Callback to reduce learning rate when validation loss plateaus
lr_scheduler = ReduceLROnPlateau(
    monitor='val_loss', # Metric to monitor
    factor=0.5, # Reduce learning rate by this factor
    patience=5, # Number of epochs with no improvement after which to reduce LR
    verbose=1, # Print messages when LR is updated
    min_lr=1e-6 # Lower bound on the learning rate
)

# Callback to save the best model (based on validation loss)
modelcheckpoint = ModelCheckpoint(filepath="model1.keras",save_best_only=True, monitor="val_loss")

# Define callbacks list to pass during training
callback_list = [lr_scheduler, modelcheckpoint]

In [None]:
# Train the model
history = model.fit(train_gen, validation_data=val_gen, epochs=70, callbacks=callback_list)

In [None]:
# Convert training history to DataFrame
metrics = pd.DataFrame(history.history)

In [None]:
# Plot training and validation loss
metrics[['loss','val_loss']].plot()
plt.show()

In [None]:
# Plot training and validation accuracy
metrics[['accuracy','val_accuracy']].plot()
plt.show()

In [None]:
# Load the best saved model and evaluate it on the validation set
model = keras.models.load_model("model1.keras")
val_loss, val_accuracy = model.evaluate(val_gen)
print(f"\nValidation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}")

Load and prepare the test set. Very similar to how the training set has been prepared.

In [None]:
test_df = pd.read_csv('TestDataset.csv')
# Extract the filename from the full path
test_df['tif_name'] = test_df['tifPath'].str.split('/').str[-1]
# Make a copy of the test DataFrame
test = test_df.copy()

In [None]:
# Drop rows that contain missing values
test_df.dropna(inplace=True)
# Sort the DataFrame based on the filename so that the dataset is ordered by the image filenames
test_df.sort_values(by='tif_name', inplace=True)

In [None]:
# Define the directory containing the test images >>>REPLACE WITH YOUR OWN
test_image_dir = 'test'

In [None]:
# Get a sorted list of all filenames in the test image directory
test_files = sorted(os.listdir(test_image_dir))

# Create full file paths by joining the directory path with each filename
test_file_paths = [os.path.join(test_image_dir, f) for f in test_files]

# Assign the full image file paths to the column 'tifPath' in the dataframe
test_df['tifPath'] = test_file_paths

In [None]:
class SatellitePredictionGenerator(Sequence):
    """Data generator for inference"""
    def __init__(self, df, batch_size=32, image_size=(224, 224), shuffle=False):
        self.df = df.reset_index(drop=True)
        self.batch_size = batch_size
        self.image_size = image_size
        self.shuffle = shuffle
        self.indices = np.arange(len(df))
        self.on_epoch_end()

    def __len__(self):
        return int(np.ceil(len(self.df) / self.batch_size))

    def __getitem__(self, index):
        # Get batch indices and corresponding file paths
        batch_indices = self.indices[index * self.batch_size:(index + 1) * self.batch_size]
        batch_paths = self.df.loc[batch_indices, 'tifPath'].values

        batch_images = []
        for path in batch_paths:
            img = imread(path).astype('float32') # Load image
            img = tf.image.resize(img, self.image_size) # Resize
            img = img / 15000.0  # Normalize to (0-1)
            batch_images.append(img)

        return (tf.stack(batch_images),) # Return as tuple for Keras compatibility

    def on_epoch_end(self):
        if self.shuffle:
            np.random.shuffle(self.indices)

In [None]:
# Create a prediction generator for the test set
pred_gen = SatellitePredictionGenerator(test_df, batch_size=32, image_size=(224, 224))

# Run inference
predictions = model.predict(pred_gen)

# Convert probability distributions to class indices
pred_classes = np.argmax(predictions, axis=1)

In [None]:
# Assign predicted class indices to a new column
test_df['label'] = pred_classes
test_df.head()

In [None]:
# Map class indices to crop names
class_to_label = {
    0: 'Cocoa',
    1: 'Palm',
    2: 'Rubber'
}

# Create the 'Target' column using the mapping
test_df['Target'] = test_df['label'].map(class_to_label)

In [None]:
# Merge the predictions with the original test DataFrame
test = test.merge(test_df[['ID','Target']], on='ID', how='left')

There are upto 12 images per ID (corresponding to months of the year). Only one submission is made per ID.

In [None]:
# Extract the prefix from the 'ID' by removing the section after the underscore
test['prefix'] = test['ID'].str.rsplit('_', n=1).str[0]

In [None]:
test.head()

In [None]:
# Group by prefix and assign the most frequent (mode) prediction as the group label
grouped = test.groupby('prefix')['Target'].agg(lambda x: x.mode()[0]).reset_index()
# Rename 'prefix' back to 'ID' to match the expected submission format
grouped = grouped.rename(columns={'prefix': 'ID'})
grouped.head()

In [None]:
# Save the final grouped predictions as a CSV file for submission
grouped[['ID','Target']].to_csv('submission.csv', index=False)