In [25]:
from torch.nn import Module, ReLU, Conv2d, Linear, MaxPool2d, LogSoftmax, NLLLoss
from torchvision import transforms, datasets
from torch.utils.data import DataLoader
from torch import flatten, float, no_grad
from torch.optim import Adam
import torch
import math

In [34]:
PARAMETERS = {
    'data_augmentation': False,
    'batch_normalization': False,
    'filters': 32, # no. of filters in first layer
    'filter_org': 'same', # 'half', 'double'
    'dropout': 0,
    'activation': 'relu',
    'train_data_dir': "./data/train",
    'test_data_dir': "./data/val",
    'batch_size': 64,
    'learning_rate': 0.001,
    'epochs': 5
}

In [35]:
CLASSES = {
    0:'Amphibia',
    1:'Animalia',
    2:'Arachnida',
    3:'Aves',
    4:'Fungi',
    5:'Insecta',
    6:'Mammalia',
    7:'Mollusca',
    8:'Plantae',
    9:'Reptilia'
}

In [36]:
def get_data(param, type):
    transform = transforms.Compose([
        transforms.Resize((256,256)),
        transforms.ToTensor(),  # Convert image to PyTorch tensor
        transforms.Normalize(mean = [0.485, 0.456, 0.406], std = [0.229, 0.224, 0.225])  # Normalize pixel values
    ])

    if(type=='train'):
        tdataset = datasets.ImageFolder(root=param['train_data_dir'], transform=transform)
        total = len(tdataset)
        train_sample = math.ceil(total*(0.8))
        val_sample = total-train_sample
        # print(total, train_sample, val_sample)
        train_dataset, validation_dataset = torch.utils.data.random_split(tdataset, [train_sample, val_sample])
        train_dataloader = DataLoader(train_dataset, batch_size=param['batch_size'], shuffle=True)
        validation_dataloader = DataLoader(validation_dataset, batch_size=param['batch_size'], shuffle=False)
        return train_dataloader, validation_dataloader
    
    # for images, labels in dataloader:
    #     # process_image_batch(images)
    #     # process_label_batch(labels)
  
    #     print(f"Image batch shape: {images.shape}")
    #     print(f"Sample label: {labels}") 

In [37]:
# get_data(PARAMETERS, 'train')

In [38]:
# print_labels(PARAMETERS)

In [39]:
# class CNN(Module):
#     def __self__(self, param):
#         self.data_augmentation = param['data_augmentation']
#         self.batch_normalization = param['batch_normalization']
#         self.filters = param['filters']
#         self.filter_org = param['filter_org']
#         self.dropout = param['dropout']
#         self.activation = param['activation']

In [40]:
class CNN(Module):
    def __init__(self, param):
        super(CNN, self).__init__()
        self.param=param
        self.data_augmentation = param['data_augmentation']
        self.batch_normalization = param['batch_normalization']
        self.dropout = param['dropout']
        self.activation = param['activation']
        self.filters = self.filter_logic(param['filters'], param['filter_org'])

        self.conv1 = Conv2d(kernel_size=(3,3), in_channels=3, out_channels=self.filters[0])
        self.act1 = ReLU()
        self.pool1 = MaxPool2d(kernel_size=(2, 2), stride=(2, 2))

        self.conv2 = Conv2d(kernel_size=(3,3), in_channels=self.filters[0], out_channels=self.filters[1])
        self.act2 = ReLU()
        self.pool2 = MaxPool2d(kernel_size=(2, 2), stride=(2, 2))

        self.conv3 = Conv2d(kernel_size=(3,3), in_channels=self.filters[1], out_channels=self.filters[2])
        self.act3 = ReLU()
        self.pool3 = MaxPool2d(kernel_size=(2, 2), stride=(2, 2))

        self.conv4 = Conv2d(kernel_size=(3,3), in_channels=self.filters[2], out_channels=self.filters[3])
        self.act4 = ReLU()
        self.pool4 = MaxPool2d(kernel_size=(2, 2), stride=(2, 2))

        self.conv5 = Conv2d(kernel_size=(3,3), in_channels=self.filters[3], out_channels=self.filters[4])
        self.act5 = ReLU()
        self.pool5 = MaxPool2d(kernel_size=(2, 2), stride=(2, 2))

        self.fc1 = Linear(in_features=1152, out_features=500)  # How to calculate dimension of filters at previous level
        self.act6 = ReLU()
        
        self.out = Linear(in_features=500, out_features=10)
        self.act7 = LogSoftmax(dim=1)

    
    def filter_logic(self, filter, org):
        level = []
        org = org.lower()
        if org == 'same':
            level = [filter for i in range(5)]
        elif org == 'double':
            level = [filter*pow(2,i) for i in range(5)]
        elif org == 'half':
            level = [max(filter//pow(2,i),1) for i in range(5)]
        return level

    

    def forward(self, r):

        r=self.conv1(r)
        r=self.act1(r)
        r=self.pool1(r)

        r=self.conv2(r)
        r=self.act2(r)
        r=self.pool2(r)

        r=self.conv3(r)
        r=self.act3(r)
        r=self.pool3(r)

        r=self.conv4(r)
        r=self.act4(r)
        r=self.pool4(r)

        r=self.conv5(r)
        r=self.act5(r)
        r=self.pool5(r)

        r=flatten(r,1)
        r=self.fc1(r)
        r=self.act6(r)
        
        r=self.out(r)
        output=self.act7(r)

        return output
        


In [41]:
def train(param):
    device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
    model = CNN(param).to(device)
    optimizer = Adam(model.parameters(), lr=param['learning_rate'])
    loss_function = NLLLoss()
    train_data_loader, validation_data_loader = get_data(param, 'train')
    

    for _ in range(param['epochs']):
        model.train()
        totalTrainLoss = 0
        totalValLoss = 0
        trainCorrect = 0
        valCorrect = 0
        train_counter=0
        validation_counter=0
        for (image, label) in train_data_loader:
            (image, label) = (image.to(device), label.to(device))
            prediction = model(image)
            loss = loss_function(prediction, label)
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            totalTrainLoss += loss
            trainCorrect += (prediction.argmax(1) == label).type(float).sum().item()
            train_counter+=1
            print(train_counter)
        
        with no_grad():
            model.eval()
            for (image, label) in validation_data_loader:
                (image, label) = (image.to(device), label.to(device))
                pred = model(image)
                totalValLoss += loss_function(pred, label)
                valCorrect += (pred.argmax(1) == label).type(float).sum().item()
                validation_counter+=1

        print(f"Train Loss --> {(totalTrainLoss/train_counter).cpu().detach().numpy()}")
        print(f"Train Accuracy --> {trainCorrect/len(train_data_loader.dataset)}")
        print(f"Validation Loss --> {(totalValLoss/validation_counter).cpu().detach().numpy()}")
        print(f"Validation Accuracy --> {valCorrect/len(validation_data_loader.dataset)}")

In [42]:
train(PARAMETERS)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
Train Loss --> 2.249912977218628
Train Accuracy --> 0.1575
Validation Loss --> 2.138025999069214
Validation Accuracy --> 0.22511255627813906
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
Train Loss --> 2.111701488494873
Train Accuracy --> 0.228
Validation Loss -

In [1]:
import torch
if torch.backends.mps.is_available():
    mps_device = torch.device("mps")
    x = torch.ones(1, device=mps_device)
    print (x)
else:
    print ("MPS device not found.")

tensor([1.], device='mps:0')


In [4]:
import torch
device = torch.device('cpu')
x = torch.rand((10000, 10000), dtype=torch.float32)
y = torch.rand((10000, 10000), dtype=torch.float32)
x = x.to(device)
y = y.to(device)

In [5]:
x * y

tensor([[0.1671, 0.0814, 0.5191,  ..., 0.1009, 0.0496, 0.1565],
        [0.2021, 0.0217, 0.2716,  ..., 0.5076, 0.1028, 0.0096],
        [0.6073, 0.1330, 0.1228,  ..., 0.6713, 0.2176, 0.0494],
        ...,
        [0.5357, 0.6622, 0.2862,  ..., 0.0181, 0.4730, 0.3859],
        [0.7609, 0.3169, 0.6223,  ..., 0.2039, 0.1028, 0.6419],
        [0.0249, 0.0267, 0.0817,  ..., 0.5312, 0.1486, 0.2523]])

In [6]:
import torch
device = torch.device('mps')
x = torch.rand((10000, 10000), dtype=torch.float32)
y = torch.rand((10000, 10000), dtype=torch.float32)
x = x.to(device)
y = y.to(device)

In [7]:
x*y

tensor([[1.3178e-01, 8.3787e-02, 4.4314e-01,  ..., 8.8996e-01, 1.4081e-02,
         5.4194e-01],
        [1.3943e-01, 1.5948e-03, 3.1647e-01,  ..., 7.4134e-01, 1.0782e-01,
         7.6612e-02],
        [4.8358e-01, 1.2294e-01, 5.9519e-03,  ..., 1.8317e-01, 2.0040e-02,
         3.5919e-02],
        ...,
        [2.0239e-01, 5.0098e-02, 7.8769e-02,  ..., 1.7920e-02, 8.6077e-04,
         8.5803e-03],
        [3.4244e-01, 1.9598e-01, 1.9167e-01,  ..., 2.6779e-01, 1.8545e-01,
         4.4428e-01],
        [6.3024e-02, 2.0180e-01, 7.0728e-01,  ..., 1.2550e-02, 1.4994e-01,
         7.1662e-02]], device='mps:0')

In [8]:
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
print(device)

mps
