In [4]:
!gdown --id 1Tmo5_FY9io63lvHA-hVb28sl3KEu07Qx

Downloading...
From: https://drive.google.com/uc?id=1Tmo5_FY9io63lvHA-hVb28sl3KEu07Qx
To: /content/img_cls_weather_dataset.zip
100% 613M/613M [00:04<00:00, 131MB/s] 


In [5]:
!unzip -q /content/img_cls_weather_dataset.zip

In [6]:
!mv /content/weather-dataset /content/data

In [7]:
import torch
import torch .nn as nn
import os
import numpy as np
import matplotlib . pyplot as plt
from PIL import Image
from torch.utils . data import Dataset , DataLoader
from sklearn.model_selection import train_test_split

In [8]:
root_dir = '/content/data/dataset'

# get Class Names
classes = {
    label_idx : class_name for label_idx, class_name in enumerate(sorted(os.listdir(root_dir)))
}

In [9]:
img_paths = []
labels = []

# get Image Paths and Labels
for label_idx, class_name in classes.items():
    for img_name in os.listdir(os.path.join(root_dir, class_name)):
        img_paths.append(os.path.join(root_dir,class_name,img_name))
        labels.append(label_idx)

In [10]:
seed = 0
val_size = 0.2
test_size = 0.125
is_shuffle = True

# split train, validation, test set
X_train , X_val , y_train , y_val = train_test_split(
    img_paths,
    labels,
    test_size=test_size,
    random_state=seed,
    shuffle=is_shuffle
)

X_train , X_test , y_train , y_test = train_test_split(
    X_train,
    y_train,
    test_size=val_size,
    random_state=seed,
    shuffle=is_shuffle
)

In [11]:
# define Dataset
class WeatherDataset(Dataset):
    def __init__(self, img_paths, labels, transform=None):
        self.img_paths = img_paths
        self.labels = labels
        self.transform = transform

    def __getitem__(self, index):
        img_path = self.img_paths[index]
        label = self.labels[index]
        img = Image.open(img_path)
        img = img.convert("RGB")
        if self.transform is not None:
            img = self.transform(img)
        return img, label

    def __len__(self):
        return len(self.img_paths)

In [12]:
# define Preprocess Images: Transform
def transform_image(image, image_size=(224,224)):
    img = image.resize(image_size)
    img = np.array(img)[...,:3] # in data, some image has 4 channels, but we need only 3 channels
    img = torch.tensor(img).permute(2,0,1).float() # permute: (H,W,C) -> (C,H,W); C is number of channels
    norm_img = img / 255.0
    return norm_img

In [13]:
train_dataset = WeatherDataset(X_train, y_train, transform = transform_image)
val_dataset = WeatherDataset(X_val, y_val, transform = transform_image)
test_dataset = WeatherDataset(X_test, y_test, transform = transform_image)

In [14]:
# Define DataLoader
train_batch_size = 64
test_batch_size = 8

train_loader = DataLoader(train_dataset, batch_size=train_batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=test_batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=test_batch_size, shuffle=False)

In [15]:
# Define Model: ResNet
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(out_channels)

        self.downsample = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.downsample = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride),
                nn.BatchNorm2d(out_channels)
            )

        self.relu = nn.ReLU(inplace=True)

    def forward(self, x):
        shortcut = x.clone()
        out = self.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.downsample(shortcut)
        out = self.relu(out)
        return out

class ResNet(nn.Module):
    def __init__(self, residual_block, layers, num_classes=4):
        super().__init__()
        self.in_channels = 64
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3) # 3 is number of channels
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        self.layer1 = self.make_layer(residual_block, 64, layers[0], stride=1)
        self.layer2 = self.make_layer(residual_block, 128, layers[1], stride=2)
        self.layer3 = self.make_layer(residual_block, 256, layers[2], stride=2)
        self.layer4 = self.make_layer(residual_block, 512, layers[3], stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1,1))
        self.flatten = nn.Flatten()
        self.fc = nn.Linear(512, num_classes)

    def make_layer(self, residual_block, out_channels, blocks, stride=1):
        layers = []
        layers.append(residual_block(self.in_channels, out_channels, stride))
        self.in_channels = out_channels
        for _ in range(1, blocks):
            layers.append(residual_block(self.in_channels, out_channels, stride=1))
        return nn.Sequential(*layers)

    def forward(self, x):
        out = self.relu(self.bn1(self.conv1(x)))
        out = self.maxpool(out)

        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)

        out = self.avgpool(out) # (B, 512, 1, 1)
        out = self.flatten(out) # (B, 512)
        out = self.fc(out) # (B, 4)
        return out

In [16]:
n_classes = len(list(classes.keys()))
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = ResNet(ResidualBlock, [2,2,2,2], num_classes=n_classes).to(device)

In [17]:
# Define Evaluation
def evaluate(model, data_loader, criterion, device):
    model.eval()
    total = 0
    correct = 0
    losses = []

    with torch.no_grad():
        for data in data_loader:
            inputs, labels = data
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            loss = criterion(outputs, labels)

            _, predicted = torch.max(outputs, dim=1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            losses.append(loss.item())

    loss = sum(losses) / len(losses)
    acc = correct / total

    return loss, acc

In [18]:
# define Training
def fit(model, train_loader, val_loader, criterion, optimizer, n_epochs, device):
    train_losses = []
    val_losses = []

    for epoch in range(n_epochs):
        batch_train_losses = []
        model.train()

        for idx, data in enumerate(train_loader):
            inputs, labels = data
            inputs = inputs.to(device)
            labels = labels.to(device)

            optimizer.zero_grad()

            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            batch_train_losses.append(loss.item())

        train_loss = sum(batch_train_losses) / len(batch_train_losses)
        val_loss, val_acc = evaluate(model, val_loader, criterion, device)

        train_losses.append(train_loss)
        val_losses.append(val_loss)

        print(f"Epoch: {epoch+1}/{n_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")

    return train_losses, val_losses

In [19]:
# Define Training Parameters
lr = 1e-3
n_epochs = 50

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=lr)

In [None]:
# Training
train_losses, val_losses = fit(model, train_loader, val_loader, criterion, optimizer, n_epochs, device)

Epoch: 1/50, Train Loss: 1.9221, Val Loss: 1.5978, Val Acc: 0.4697


In [None]:
# Plot Losses
plt.plot(train_losses, label='Train Loss')
plt.plot(val_losses, label='Val Loss')
plt.legend()
plt.show()

In [None]:
# Define Test
val_loss , val_acc = evaluate (model, val_loader, criterion, device)
test_loss , test_acc = evaluate (model, test_loader, criterion, device )

print ('Evaluation on val / test dataset')
print ('Val accuracy :', val_acc )
print ('Test accuracy :', test_acc)