CNN Classifier for SAT-6
===
Todo
---
- [ ] Log learning rate, loss, and accuracy to `TensorBoardX` along with standard classification stats (recall, precision, f1, confusion matrix etc.)
- [ ] Implement stochastic gradient descent with warm restarts
- [ ] Implement test time augmentation
- [ ] Implement early stopping ans save best weights
- [ ] Achieve overall accuracy of 98.5%

In [2]:
import os
import pandas as pd
import numpy as np
import torch as t
from torch.utils.data import Dataset, DataLoader
import torchvision as tv
import scipy.io
from tensorboardX import SummaryWriter
from tqdm import tqdm

In [3]:
data_dir = '/Users/tyler/Datasets/deepsat-sat6/'

In [4]:
os.listdir(data_dir)

['y_test_sat6.csv',
 'sat-6-full.mat',
 'X_train_sat6.csv',
 'sat6annotations.csv',
 'y_train_sat6.csv',
 'X_test_sat6.csv']

In [5]:
class SatelliteDataset(Dataset):
    def __init__(self, X_csv, y_csv, sample_size=1000):
        if sample_size:
            self.instances = pd.read_csv(X_csv, nrows=sample_size)
            self.labels = pd.read_csv(y_csv, nrows=sample_size)
        else:
            self.instances = pd.read_csv(X_csv)
            self.labels = pd.read_csv(y_csv)

    def __len__(self): return len(self.labels)

    def __getitem__(self, idx):
        img = self.instances.iloc[idx].values.reshape(-1, 4, 28, 28).clip(0, 255).astype(np.uint8).squeeze(axis=0)
        label = self._get_labels(self.labels.iloc[idx].values)
        return {'chip': img, 'label': label}

    def _get_labels(self, row_values):
        annotations = {'building': 0, 'barren_land': 1,'trees': 2, 'grassland': 3, 'road': 4, 'water': 5}
        labels = [list(annotations.values())[i] for i, x in enumerate(row_values) if x == 1]
        return labels[0]

In [6]:
training_set = SatelliteDataset(X_csv=data_dir+'X_train_sat6.csv', y_csv=data_dir+'y_train_sat6.csv')
validation_set = SatelliteDataset(X_csv=data_dir+'X_test_sat6.csv', y_csv=data_dir+'y_test_sat6.csv')

In [7]:
tfms = tv.transforms.Compose([
    tv.transforms.RandomHorizontalFlip(),
    tv.transforms.RandomVerticalFlip(),
    tv.transforms.RandomRotation(90),
    tv.transforms.ToTensor()
])

In [8]:
train_loader = DataLoader(
    training_set,
    batch_size=64,
    shuffle=True,
    num_workers=4
)

val_loader = DataLoader(
    validation_set,
    batch_size=64,
    shuffle=True,
    num_workers=4
)

In [9]:
class SmallCNN(t.nn.Module):
    def __init__(self):
        super(SmallCNN, self).__init__()
        self.conv1 = t.nn.Sequential(
            t.nn.Conv2d(
                in_channels=4,
                out_channels=32,
                kernel_size=3
            ),
            t.nn.BatchNorm2d(32),
            t.nn.ReLU(),
            t.nn.Conv2d(
                in_channels=32,
                out_channels=32,
                kernel_size=3
            ),
            t.nn.BatchNorm2d(32),
            t.nn.ReLU(),
        )
        self.conv2 = t.nn.Sequential(
            t.nn.Conv2d(
                in_channels=32,
                out_channels=64,
                kernel_size=3
            ),
            t.nn.BatchNorm2d(64),
            t.nn.ReLU(),
            t.nn.Conv2d(
                in_channels=64,
                out_channels=64,
                kernel_size=3
            ),
            t.nn.BatchNorm2d(64),
            t.nn.ReLU(),
        )
        self.pool = t.nn.MaxPool2d(2)
        self.fc1 = t.nn.Linear(1024, 512)
        self.bn = t.nn.BatchNorm1d(512)
        self.drop_out = t.nn.Dropout2d(0.2)
        self.fc2 = t.nn.Linear(512, 6)

    def forward(self, x):
        x = self.conv1(x)
        x = self.pool(x)
        x = self.conv2(x)
        x = self.pool(x)
        x = x.view(x.size(0), -1)  # flatten
        x = self.fc1(x)
        x = self.bn(x)
        x = t.nn.functional.relu(x)
        x = self.drop_out(x)
        output = self.fc2(x)
        return output

In [19]:
writer = SummaryWriter(log_dir='logs')

In [15]:
model = SmallCNN()
optim = t.optim.Adam(model.parameters())
crit = t.nn.CrossEntropyLoss()

In [16]:
def train(epochs, model_name):
    for e in tqdm(range(epochs)):
        # Training loop
        model.train()
        train_correct = 0
        train_loss = 0
        for s, inst in enumerate(train_loader):
            chip = inst['chip'].float()
            output = model(chip)
            loss = crit(output, inst['label'])
            
            optim.zero_grad()
            loss.backward()
            optim.step()
            
            train_loss += loss
            _, predicted = t.max(output.data, 1)
            train_correct += (predicted == inst['label']).sum().item()
    
        writer.add_scalars('loss', {'training_loss': loss / len(training_set)}, e)
        writer.add_scalars('acc', {'train_acc': train_correct / len(training_set)}, e)
            
        # Validation loop
        model.eval()
        val_correct = 0
        val_loss = 0
        for s, inst in enumerate(val_loader):
            chip = inst['chip'].float()
            output = model(chip)
            loss = crit(output, inst['label'])
            
            val_loss += loss
            _, predicted = t.max(output.data, 1)
            val_correct = (predicted == inst['label']).sum().item()
            
        writer.add_scalars('loss', {'validation_loss': val_loss / len(validation_set)}, e)
        writer.add_scalars('acc', {'val_acc': val_correct / len(validation_set)})
        
    t.save(model.state_dict(), '{}.pt'.format(model_name))

In [17]:
train(3, 'test')

100%|██████████| 3/3 [00:13<00:00,  4.44s/it]
