## Final Project: Predicting 6 Vital Plant Traits

Stephen Hwang (#20889701)\
CS 480 Spring 2024\
Due August 12, 2024

#### Imports and Setup

In [None]:
import torch
import torchvision.transforms as transforms
import torchvision.models as models
from torch.utils.data import DataLoader, Dataset
from torch.utils.tensorboard import SummaryWriter
from torch import nn
from PIL import Image
import os
import pandas as pd
import numpy as np
import csv
import timm
from sklearn.metrics import r2_score

device = torch.device("cuda:0") if torch.cuda.is_available() else torch.device("cpu")
print(f'device: {device}')

#### Settable Fields for Training

In [36]:
NUM_EPOCHS = 50
EARLY_STOP_R2 = 1

#### Plant Dataset

In [37]:
class PlantDataset(Dataset):
    def __init__(self, img_dir, csv_file, norm_file, img_transform, train=True):
        # images
        self.img_dir = img_dir
        self.img_transform = img_transform
        self.img_names = [img_name for img_name in os.listdir(self.img_dir)]
        self.img_paths = [os.path.join(self.img_dir, img_name) for img_name in self.img_names]

        # additional attributes
        self.trait_data = pd.read_csv(csv_file)
        self.trait_data = self.trait_data.sort_values(by=self.trait_data.columns[0], key=lambda x: x.astype(str), ascending=True)
        self.ids = np.array(self.trait_data.iloc[:, 0].values)
        self.attrs = np.array(self.trait_data.iloc[:, 1:164].values)
        self.targets = np.array(self.trait_data.iloc[:, 164:].values)
        if not train:
            self.targets = [[0, 0, 0, 0, 0, 0] for _ in self.targets]

        # information for attribute normalization
        self.max_min_data = pd.read_csv(norm_file)
        self.attr_max = self.max_min_data.iloc[0, 0:163].values
        self.attr_min = self.max_min_data.iloc[1, 0:163].values
        self.target_max = self.max_min_data.iloc[0, 163:].values
        self.target_min = self.max_min_data.iloc[1, 163:].values

        # target manipulation
        if train:
            # find data points with target outliers
            target_means = np.mean(self.targets, axis=0)
            target_std_devs = np.std(self.targets, axis=0)
            target_lower = target_means - (3 * target_std_devs)
            target_upper = target_means + (3 * target_std_devs)
            target_outliers = []

            # remove data points with target outliers
            for i, val in enumerate(self.targets):
                if any(val < target_lower) or any(val > target_upper):
                    target_outliers.append(i)
            self.img_names = [ele for idx, ele in enumerate(self.img_names) if idx not in target_outliers]
            self.img_paths = [ele for idx, ele in enumerate(self.img_paths) if idx not in target_outliers]
            self.ids = np.array([ele for idx, ele in enumerate(self.ids) if idx not in target_outliers])
            self.attrs = np.array([ele for idx, ele in enumerate(self.attrs) if idx not in target_outliers])
            self.targets = np.array([ele for idx, ele in enumerate(self.targets) if idx not in target_outliers])

        # asserts for correlation between image and attribute data
        assert len(self.img_names) == len(self.img_paths)
        assert len(self.img_paths) == len(self.ids)
        assert len(self.ids) == len(self.attrs)
        assert len(self.attrs) == len(self.targets)
        for i in range(0, len(self.img_names)):
            id, _ = os.path.splitext(self.img_names[i])
            assert str(self.ids[i]) == id

        # asserts for correlation between attribute and normalization data
        assert len(self.attr_max) == len(self.attrs[0])
        assert len(self.attr_min) == len(self.attrs[0])
        assert len(self.target_max) == len(self.targets[0])
        assert len(self.target_min) == len(self.targets[0])

    def __len__(self):
        return len(self.ids)
    
    def __getitem__(self, idx):
        # load item
        image = self.img_transform(Image.open(self.img_paths[idx]))
        attrs = torch.tensor(self.attrs[idx], dtype=torch.float32)
        targets = torch.tensor(self.targets[idx], dtype=torch.float32)
        id = self.ids[idx]

        # normalize attributes
        min_attr_values = torch.tensor(self.attr_min, dtype=torch.float32)
        max_attr_values = torch.tensor(self.attr_max, dtype=torch.float32)
        range_attr_values = max_attr_values - min_attr_values
        range_attr_values = torch.where(range_attr_values == 0, torch.tensor(1.0), range_attr_values)
        norm_attrs = (attrs - min_attr_values) / range_attr_values
        
        # normalize targets
        min_target_value = torch.tensor(self.target_min, dtype=torch.float32)
        max_target_value = torch.tensor(self.target_max, dtype=torch.float32)
        range_target_value = max_target_value - min_target_value
        range_target_value = torch.where(range_target_value == 0, torch.tensor(1.0), range_target_value)
        norm_targets = (targets - min_target_value) / range_target_value

        return image, norm_attrs, norm_targets, id
    
    def get_image_mean_std(self):
        rgb_values = np.concatenate(
            [Image.open(img).getdata() for img in self.img_paths], 
            axis=0
        ) / 255.
        mean = np.mean(rgb_values, axis=0)
        std = np.std(rgb_values, axis=0)
        return mean, std
    
    def reverse_normalization(self, normalized_val):
        range_target_value = self.target_max - self.target_min
        val = (normalized_val * ([rng if rng != 0 else 1 for rng in range_target_value])) + self.target_min
        return val

#### Load Training Data

In [38]:
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.ColorJitter(brightness=0.1),
    transforms.RandomResizedCrop(size=(224, 224), scale=(1, 1.2)),
    transforms.ToTensor(),
    transforms.Normalize([0.44636917, 0.45045858, 0.33603736], [0.21836502, 0.20886066, 0.21879451])
])

train_dataset = PlantDataset(img_dir='./data/train_images', csv_file='./data/train.csv', norm_file='./data/max_mins.csv', img_transform=transform)
train_set, val_set = torch.utils.data.random_split(train_dataset, [train_dataset.__len__() - 4000, 4000])

train_loader = DataLoader(train_set, batch_size=32, shuffle=True)
val_loader = DataLoader(val_set, batch_size=32, shuffle=False)

In [39]:
class ResidualBlock(nn.Module):
    def __init__(self, in_features, hidden_features):
        super().__init__()
        self.fc1 = nn.Linear(in_features, hidden_features)
        self.bn1 = nn.BatchNorm1d(hidden_features)
        self.fc2 = nn.Linear(hidden_features, in_features)
        self.bn2 = nn.BatchNorm1d(in_features)
        self.relu = nn.ReLU(inplace=True)
        self.dropout = nn.Dropout(0.3)

    def forward(self, x):
        identity = x
        out = self.fc1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.fc2(out)
        out = self.bn2(out)
        out += identity
        out = self.relu(out)
        return out

#### Image CNN

In [50]:
class ImageCNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.cnn = models.resnet101(weights='DEFAULT')
        # self.cnn = models.mobilenet_v2(weights='DEFAULT')
        # self.cnn = timm.create_model('inception_resnet_v2', pretrained=True)
        self.final_fc = nn.Sequential(
            nn.Linear(1000, 512),
            nn.ReLU(),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(256, 128)
        )

    def forward(self, img):
        x = self.cnn(img)
        x = self.final_fc(x)
        return x

#### Attribute NN

In [51]:
class AttributeNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.block1 = ResidualBlock(163, 256)
        self.block2 = ResidualBlock(163, 256)
        self.block3 = ResidualBlock(163, 128)
        self.fc_out = nn.Linear(163, 128)
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        x = self.block1(x)
        x = self.block2(x)
        x = self.block3(x)
        x = self.dropout(x)
        x = self.fc_out(x)
        return x

#### Combined NN

In [52]:
class CombinedNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.image_cnn = ImageCNN()
        self.attr_nn = AttributeNN()
        self.fc = nn.Sequential(
            nn.Linear(128 + 128, 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, 64),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(64, 64),
            nn.ReLU(),
            nn.Linear(64, 6)
        )

    def forward(self, img, attrs):
        img_features = self.image_cnn(img)
        attr_features = self.attr_nn(attrs)
        combined = torch.cat((img_features, attr_features), dim=1)
        return self.fc(combined)

In [53]:
net = CombinedNN().to(device)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(net.parameters(), lr=0.0005, weight_decay=0.00001)
writer = SummaryWriter()

#### Training-Validation Loop (i.e. Train New Model)

In [None]:
for epoch in range(1, NUM_EPOCHS + 1):
    net.train()
    train_loss = 0
    train_inputs = 0
    print('-')

    # iterate through training batch
    for i, data in enumerate(train_loader, 0):
        images, attrs, targets = data[0].float().to(device), data[1].float().to(device), data[2].float().to(device)
        optimizer.zero_grad()

        outputs = net(images, attrs)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()

        train_loss += loss.item() * targets.size(0)
        train_inputs += targets.size(0)

    writer.add_scalar("Average Training Loss", train_loss / train_inputs, epoch)
    print(f'Epoch {epoch} complete, average loss: {train_loss / train_inputs}')
    
    # compute validation loss and R2 score
    net.eval()
    target_preds = []
    target_true = []
    val_loss = 0
    val_inputs = 0

    with torch.no_grad():
        for i, data in enumerate(val_loader, 0):
            images, attrs, targets = data[0].float().to(device), data[1].float().to(device), data[2].float().to(device)
            
            outputs = net(images, attrs)
            loss = criterion(outputs, targets)

            val_loss += loss.item() * targets.size(0)
            val_inputs += targets.size(0)
            target_preds.append(outputs.cpu().numpy())
            target_true.append(targets.cpu().numpy())

    target_preds = np.concatenate(target_preds)
    target_true = np.concatenate(target_true)

    r2 = r2_score(target_true, target_preds, multioutput='raw_values')
    print(f'Epoch {epoch} complete, average val loss: {val_loss / val_inputs}')
    print(f'R2 score for val set epoch {epoch}: {r2}')

    writer.add_scalar("Average Validation Loss", val_loss / val_inputs, epoch)
    writer.add_scalar("Validation R2 Score", np.mean(r2), epoch)

    if any(r2 > [EARLY_STOP_R2 for _ in range(0, 6)]):
        break

writer.flush()
writer.close()

print('-')
print('Training complete')

#### Test Model and Generate CSV

In [34]:
test_dataset = PlantDataset(img_dir='./data/test_images', csv_file='./data/test.csv', norm_file='./data/max_mins.csv', img_transform=transform, train=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

test_ids = []
test_preds = []

with torch.no_grad():
    for i, data in enumerate(test_loader, 0):
        images, attrs, ids = data[0].float().to(device), data[1].float().to(device), data[3]
        outputs = net(images, attrs)
        test_preds.extend(outputs.cpu().numpy())
        test_ids.extend(ids.numpy())

normal_test_preds = [train_dataset.reverse_normalization(pred) for pred in test_preds]

filename = f'./outputs/test.csv'
with open(filename, mode='w', newline='') as file:
    writer = csv.writer(file)
    header = ['id', 'X4', 'X11', 'X18', 'X26', 'X50', 'X3112'] 
    writer.writerow(header)
    for id, predictions in zip(test_ids, normal_test_preds):
        row = [id] + list(predictions)
        writer.writerow(row)