# Q2 Weather Recognition

## Dependencies

In [1]:
from PIL import Image as PILImage
from torch import nn
from torch.autograd import Variable
from torchvision import models, transforms
from torch.utils.data import DataLoader, TensorDataset
import torch
import os

## Construct Dataset

In [2]:
# Get the path of all JPG files in the folder
folder_path = '../Q2_data/train_data'
file_list = os.listdir(folder_path)
jpg_files = [file for file in file_list if file.endswith('.jpg')]
print(len(jpg_files))

250


In [3]:
# Define a global transformer to appropriately scale images and subsequently convert them to a Tensor
img_size = 224
loader = transforms.Compose([
  transforms.Resize(img_size),
  transforms.CenterCrop(img_size),
  transforms.ToTensor(),
])
def load_image(filename):
    image = PILImage.open(filename).convert('RGB')
    image_tensor = loader(image).float()
    image_var = Variable(image_tensor).unsqueeze(0)
    return image_var

In [4]:
# Convert the images in the folder to a tensor of 3 * 224 * 224
images_list = [load_image(folder_path+'/'+file) for file in jpg_files]
all_img = torch.cat(images_list, dim=0)
print(all_img.shape)

torch.Size([250, 3, 224, 224])


In [5]:
# Convert weather labels into the form of one-hot encoding
label_list = []
weather_list = ['Cloudy', 'Foggy', 'Rainy', 'Snowy', 'Sunny']
for file in jpg_files:
    label = []
    for weather in weather_list:
        if weather in file:
           label.append(1) 
        else:
            label.append(0)
    label_list.append(label)
    
all_labels = torch.Tensor(label_list)
print(all_labels.shape)

torch.Size([250, 5])


## Training model

In [13]:
# Construct a class for the model
class CNNModel(nn.Module):
    def __init__(self, num_classes=5):
        super(CNNModel, self).__init__()

        self.conv_layers = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False),

            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False),

            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False),

            nn.Conv2d(256, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False),

            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False),

            )


        self.fc_layers = nn.Sequential(
            nn.Linear(512 * 7 * 7, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(0.1),

            nn.Linear(4096, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(0.1),
            nn.Linear(4096, 1000),
            nn.Linear(1000, 5),

            )

        self.sigmoid = nn.Sigmoid()

    def forward(self,x):
        x = self.conv_layers(x)
        x = x.view(x.size(0), -1)
        x = self.fc_layers(x)
        # x = self.sigmoid(x)
        return x

In [16]:
# Send model into GPU
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = CNNModel().to(device)

# Define a training function
def train(model, learning_rate=0.0005, batch_size=25, epochs=100):
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    
    # Encapsulate the data
    train_dataset = TensorDataset(all_img, all_labels)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)


    for epoch in range(epochs):
        for i, (images, labels) in enumerate(train_loader):
            optimizer.zero_grad()
            images=images.to(device)
            labels=labels.to(device)
            outputs = model(images)


            loss = criterion(outputs, labels.float()).to(device)

            loss.backward()
            optimizer.step()

            if (i + 1) % 5 == 0:
                print(f'Epoch [{epoch + 1}/{epochs}], Step [{i + 1}], Loss: {loss.item()}')

# Finally train the model
train(model)

Epoch [1/100], Step [5], Loss: 2.5191080570220947
Epoch [1/100], Step [10], Loss: 3.668705463409424
Epoch [2/100], Step [5], Loss: 1.542222261428833
Epoch [2/100], Step [10], Loss: 1.3412532806396484
Epoch [3/100], Step [5], Loss: 1.4309747219085693
Epoch [3/100], Step [10], Loss: 1.49448823928833
Epoch [4/100], Step [5], Loss: 1.250004768371582
Epoch [4/100], Step [10], Loss: 1.3834022283554077
Epoch [5/100], Step [5], Loss: 1.4812248945236206
Epoch [5/100], Step [10], Loss: 1.3480709791183472
Epoch [6/100], Step [5], Loss: 1.3229665756225586
Epoch [6/100], Step [10], Loss: 1.2914235591888428
Epoch [7/100], Step [5], Loss: 1.5125315189361572
Epoch [7/100], Step [10], Loss: 1.394298791885376
Epoch [8/100], Step [5], Loss: 1.328277587890625
Epoch [8/100], Step [10], Loss: 1.6516457796096802
Epoch [9/100], Step [5], Loss: 1.4532438516616821
Epoch [9/100], Step [10], Loss: 1.2924716472625732
Epoch [10/100], Step [5], Loss: 1.502036213874817
Epoch [10/100], Step [10], Loss: 1.3767292499542

## Test the model

In [17]:
# Test

# Set the model to evaluation mode
model.eval()
correct = 0
with torch.no_grad():
    train_dataset = TensorDataset(all_img, all_labels)
    train_loader = DataLoader(train_dataset)
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        inputs = inputs.double()
        outputs = model(inputs.float())
        predicted = torch.max(outputs, 1)[1]
        label = torch.max(labels, 1)[1]
        correct += (predicted == label).sum()

# Calculate the accuracy 
print('acc: %.2f %%' % (100 * correct / 250))

acc: 96.40 %
