In [1]:
import os
import re
import rasterio
import numpy as np
from sklearn.preprocessing import LabelEncoder
import torch
from torch.utils.data import Dataset, DataLoader 
from torchvision import transforms # For standard image transforms 
import torch.nn as nn
from transformers import ViTForImageClassification, ViTFeatureExtractor # For ViT model
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report 
import pandas as pd 

In [2]:
# --- Configuration ---
TRAIN_FOLDER = './train'
TEST_FOLDER = './test' # Assuming you have a separate test folder
IMAGE_SIZE = (224, 224) # Standard input size for many ViTs/CNNs
BATCH_SIZE = 32
NUM_EPOCHS = 10 

# Regex pattern for filenames (updated to match your example format exactly) 
# s2_CropType_ID_UniqueID_YYYY_MM.tif
PATTERN = re.compile(r"s2_([a-zA-Z]+)_ID_([a-zA-Z0-9]+)_(\d{4})_(\d{2})\.tif")

# --- Step 1: Data Collection & Label Encoding ---
def collect_image_info(folder_path, is_train=True):
    data_info = []
    for filename in os.listdir(folder_path):
        match = PATTERN.match(filename)
        if match:
            crop_type, loc_id, year, month = match.groups()
            file_path = os.path.join(folder_path, filename)

            # For training data, skip "Unknown" labels
            if is_train and crop_type == "Unknown":
                continue

            data_info.append({
                'filename': file_path,
                'crop_type': crop_type,
                'location_id': loc_id,
                'year': int(year),
                'month': int(month)
            })
    return pd.DataFrame(data_info)

In [3]:
train_df_raw = collect_image_info(TRAIN_FOLDER, is_train=True)
test_df_raw = collect_image_info(TEST_FOLDER, is_train=False)

# Encode labels
label_encoder = LabelEncoder()
label_encoder.fit(train_df_raw['crop_type']) 
num_classes = len(label_encoder.classes_)
print(f"Detected Crop Classes: {label_encoder.classes_}")
print(f"Number of Classes: {num_classes}") 

train_df_raw['encoded_label'] = label_encoder.transform(train_df_raw['crop_type'])
# For test_df_raw, we'll keep 'crop_type' as is (will contain 'Unknown' or actual labels if available for evaluation)

unique_train_locs = train_df_raw['location_id'].unique()
train_locs, val_locs = train_test_split(unique_train_locs, test_size=0.2, random_state=42)

train_df = train_df_raw[train_df_raw['location_id'].isin(train_locs)].reset_index(drop=True)
val_df = train_df_raw[train_df_raw['location_id'].isin(val_locs)].reset_index(drop=True)
test_df = test_df_raw.reset_index(drop=True) # test_df_raw is already your final test set

print(f"\nTrain samples: {len(train_df)}")
print(f"Validation samples: {len(val_df)}")
print(f"Test samples: {len(test_df)}")


Detected Crop Classes: ['Cocoa' 'Palm' 'Rubber']
Number of Classes: 3

Train samples: 5941
Validation samples: 1492
Test samples: 2201


In [4]:
train_df.head()

Unnamed: 0,filename,crop_type,location_id,year,month,encoded_label
0,./train\s2_Cocoa_ID_0bCYpY_2024_01.tif,Cocoa,0bCYpY,2024,1,0
1,./train\s2_Cocoa_ID_0bCYpY_2024_02.tif,Cocoa,0bCYpY,2024,2,0
2,./train\s2_Cocoa_ID_0bCYpY_2024_03.tif,Cocoa,0bCYpY,2024,3,0
3,./train\s2_Cocoa_ID_0bCYpY_2024_04.tif,Cocoa,0bCYpY,2024,4,0
4,./train\s2_Cocoa_ID_0bCYpY_2024_07.tif,Cocoa,0bCYpY,2024,7,0


In [4]:
class MultiBandImageDataset(Dataset):
    def __init__(self, dataframe, transform=None, is_train=True, label_encoder=None):
        self.dataframe = dataframe
        self.transform = transform
        self.is_train = is_train
        self.label_encoder = label_encoder

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        file_path = self.dataframe.iloc[idx]['filename'] 
        raw_label = self.dataframe.iloc[idx]['crop_type']
        location_id = self.dataframe.iloc[idx]['location_id'] 

        with rasterio.open(file_path) as src:
            image_data = src.read().astype(np.float32) 

            self.num_channels = src.count 


            image_tensor = torch.from_numpy(image_data) # Shape: (C, H, W)

            if self.transform:
                image_tensor = self.transform(image_tensor)

        # Handle labels
        if self.is_train:
            label = self.label_encoder.transform([raw_label])[0] # Encode to numerical ID
        else:
            label = raw_label 

        return image_tensor, label, file_path, location_id

In [5]:
example_num_bands = rasterio.open(train_df.iloc[0]['filename']).count # Get band count from one image
DUMMY_MEAN = [0.5] * example_num_bands # Replace with actual mean for each band
DUMMY_STD = [0.5] * example_num_bands  # Replace with actual std for each band

In [6]:
class MultiBandToTensorAndNormalize:
    def __init__(self, target_size, mean, std):
        self.resize = transforms.Resize(target_size)
        self.normalize = transforms.Normalize(mean=mean, std=std)

    def __call__(self, img_tensor):
        # Assumes img_tensor is (C, H, W) float32
        img_tensor = self.resize(img_tensor) # Resizes (H, W) for all C
        img_tensor = self.normalize(img_tensor)
        return img_tensor

In [None]:
train_transform = transforms.Compose([
    MultiBandToTensorAndNormalize(IMAGE_SIZE, DUMMY_MEAN, DUMMY_STD),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
])

val_test_transform = transforms.Compose([
    MultiBandToTensorAndNormalize(IMAGE_SIZE, DUMMY_MEAN, DUMMY_STD),
])

# Create Dataset and DataLoader instances
train_dataset = MultiBandImageDataset(train_df, transform=train_transform, is_train=True, label_encoder=label_encoder)
val_dataset = MultiBandImageDataset(val_df, transform=val_test_transform, is_train=True, label_encoder=label_encoder)
test_dataset = MultiBandImageDataset(test_df, transform=val_test_transform, is_train=False)

# Reduce number of workers and enable pin_memory for faster data loading 
num_workers = min(4, os.cpu_count())
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=num_workers, pin_memory=False, persistent_workers=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=num_workers, pin_memory=False, persistent_workers=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=num_workers, pin_memory=False, persistent_workers=True)

# Verify a batch
try:
    for images, labels, paths in train_loader:
        print(f"\nBatch of images shape: {images.shape}") # (batch_size, channels, height, width)
        print(f"Batch of labels shape: {labels.shape}")   # (batch_size)
        print(f"First image path in batch: {paths[0]}")
        first_image_bands = images.shape[1]
        break 
except Exception as e:
    print(f"Error loading batch: {e}")
    print("Check your MultiBandImageDataset __getitem__ method and transforms.")

In [None]:
from transformers import AutoModelForImageClassification

model_name = "timm/mobilenetv3_small_100.lamb_in1k"
model = AutoModelForImageClassification.from_pretrained(
    model_name,
    num_labels=num_classes,
    ignore_mismatched_sizes=True # Allows adapting the classifier head and input layer
)


In [None]:
if first_image_bands != 3: # If your TIFFs are not standard RGB (3 channels)
    original_conv_layer = model.vit.embeddings.patch_embeddings.projection
    new_conv_layer = nn.Conv2d(
        in_channels=first_image_bands, # Set to the actual number of bands in your TIFFs
        out_channels=original_conv_layer.out_channels,
        kernel_size=original_conv_layer.kernel_size,
        stride=original_conv_layer.stride,
        padding=original_conv_layer.padding,
        bias=True # Keep bias as original
    )
    # Optionally, you could initialize the new_conv_layer weights from the original
    # by averaging across the input channels, or simple random initialization is fine
    # for fine-tuning.
    model.vit.embeddings.patch_embeddings.projection = new_conv_layer
    print(f"Adjusted ViT input channels from 3 to {first_image_bands}")

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

In [None]:
model.eval()
test_predictions = []
test_actual_labels = [] # To store actual labels from test_df for evaluation
test_image_paths = []
test_location_ids = [] # <--- New list to store location IDs


with torch.no_grad():
    for images, labels, paths, loc_ids in test_loader: # <--- Unpack loc_ids here
        images = images.to(device)
        outputs = model(pixel_values=images)
        logits = outputs.logits
        predicted_classes_encoded = torch.argmax(logits, dim=-1).cpu().numpy()

        test_predictions.extend(label_encoder.inverse_transform(predicted_classes_encoded))
        test_actual_labels.extend(labels)
        test_image_paths.extend(paths)
        test_location_ids.extend(loc_ids)


In [None]:
results_df = pd.DataFrame({
    # 'image_path': test_image_paths,
    'location_id': test_location_ids, # <--- Add the new column
    # 'actual_label': test_actual_labels, # This will include 'Unknown' for blind test set
    'predicted_label': test_predictions
})
print("\nSample Test Predictions:")
print(results_df.head())