In [10]:
import torch 
import torch.nn as nn
from torch.utils.data import  DataLoader
from torchvision import datasets, transforms
from torch.utils.data import Dataset
import os
from typing import Tuple, Dict, List
from PIL import Image
# https://medium.com/@karuneshu21/resnet-paper-walkthrough-b7f3bdba55f0
# https://arxiv.org/pdf/1512.03385.pdf
# https://medium.com/@karuneshu21/how-to-resnet-in-pytorch-9acb01f36cf5

In [11]:
class ImageFolderCustom(Dataset):
    
    # 2. Initialize with a targ_dir and transform (optional) parameter
    def __init__(self, targ_dir: str, transform=None) -> None:
        
        # 3. Create class attributes
        # Get all image paths
        self.paths = list(Path(targ_dir).glob("*/*.jpg")) # note: you'd have to update this if you've got .png's or .jpeg's
        # Setup transforms
        self.transform = transform
        # Create classes and class_to_idx attributes
        # print(self.paths)
        self.classes, self.class_to_idx = self.find_classes(targ_dir)

    # 4. Make function to load images
    def load_image(self, index: int) -> Image.Image:
        "Opens an image via a path and returns it."
        image_path = self.paths[index]
        return Image.open(image_path) 
    
    # 5. Overwrite the __len__() method (optional but recommended for subclasses of torch.utils.data.Dataset)
    def __len__(self) -> int:
        "Returns the total number of samples."
        return len(self.paths)
    
    # 6. Overwrite the __getitem__() method (required for subclasses of torch.utils.data.Dataset)
    def __getitem__(self, index: int) -> Tuple[torch.Tensor, int]:
        "Returns one sample of data, data and label (X, y)."
        img = self.load_image(index)
        class_name  = self.paths[index].parent.name # expects path in data_folder/class_name/image.jpeg
        class_idx = self.class_to_idx[class_name]

        # Transform if necessary
        if self.transform:
            return self.transform(img), class_idx # return data, label (X, y)
        else:
            return img, class_idx # return data, label (X, y)
        
    def find_classes(self,directory:str)->Tuple[list[str],dict[str,int]]:
        classes=sorted(entry.name for entry in os.scandir(directory) if entry.is_dir())

        if not classes:
            raise FileNotFoundError(f"Couldn't find any classes in {directory}.")
        
        class_to_idx={cls_name: i for i, cls_name in enumerate(classes)}
        
        print(classes,class_to_idx)
        return classes,class_to_idx


In [12]:
class CustomDataTest():
    def __init__(self) -> None:

        self.train_dir = "C:\\Users\\Jakub Machura\\source\\repos\\UnderstandingDeepLearning\\data\\pizza_steak_sushi\\train"
        self.test_dir = "C:\\Users\\Jakub Machura\\source\\repos\\UnderstandingDeepLearning\\data\\pizza_steak_sushi\\test"
        """
            temp placement for of transform
        """
        self.train_transform=transforms.Compose([
            transforms.Resize(size=(224,224)),
            # transforms.TrivialAugmentWide(num_magnitude_bins=31),
            transforms.ToTensor()
        ])
    
        self.test_transforms = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor()
        ])
    
        self.train_data_custom =ImageFolderCustom(self.train_dir,transform=self.train_transform)
        self.test_data_custom=ImageFolderCustom(self.test_dir,transform=self.test_transforms)
    
        """temp place for testing data"""    
        # Check for equality amongst our custom Dataset and ImageFolder Dataset
        # print((len(self.train_data_custom) == len(data.train_data)) & (len(self.test_data_custom) == len(data.test_data)))
        # print(self.train_data_custom.classes == data.train_data.classes)
        # print(self.train_data_custom.class_to_idx == data.train_data.class_to_idx)

        self.IntoDataLoaders()

    def IntoDataLoaders(self):
        BATCH_SIZE=32
        # NUM_WORKERS = os.cpu_count()
        NUM_WORKERS = 1

        # print(f"number of workers avalible {NUM_WORKERS}")
        self.train_dataloader= DataLoader(dataset=self.train_data_custom, # use custom created train Dataset
                                     batch_size=BATCH_SIZE, # how many samples per batch?
                                    #  num_workers=NUM_WORKERS, # how many subprocesses to use for data loading? (higher = more)
                                     shuffle=True) # shuffle the data?

        self.test_dataloader = DataLoader(dataset=self.test_data_custom, # use custom created test Dataset
                                    batch_size=BATCH_SIZE, 
                                    # num_workers=NUM_WORKERS, 
                                    shuffle=False) # don't usually need to shuffle testing data

        img,label=next(iter(self.test_dataloader))
        # print(f"shape of custome dataloader img {img.shape}")

In [41]:
class ResNet(nn.Module):
    def __init__(self,in_channels,num_classes) -> None:
        super().__init__()

        self.conv1=nn.Conv2d(in_channels,64,kernel_size=7,stride=2,padding=3)
        self.bn1=nn.BatchNorm2d(64)
        self.relu=nn.ReLU()
        self.maxpool=nn.MaxPool2d(kernel_size=3,stride=2,padding=1)

        self.avgpool=nn.AdaptiveAvgPool2d((1,1))
        self.fc=nn.Linear(512,num_classes)

        self.in_channels=64
        self.out_channels=64

        # self.conv2=[]
        # for i in range(layers[0]):
        #     self.conv2.append(nn.Conv2d(self.in_channels,self.out_channels,kernel_size=3,padding=2,stride=1))
        # self.bn2=nn.BatchNorm2d(self.out_channels)

        self.conv2_1=nn.Conv2d(self.in_channels,self.out_channels,kernel_size=3,padding=1,stride=1)
        self.bn2_1=nn.BatchNorm2d(self.out_channels)

        self.out_channels=self.out_channels*2
        self.conv3_1=nn.Conv2d(self.in_channels,self.out_channels,kernel_size=3,padding=1,stride=2)
        self.bn3_1=nn.BatchNorm2d(self.out_channels)

        self.in_channels=self.out_channels
        self.out_channels=self.out_channels*2
        self.conv4_1=nn.Conv2d(self.in_channels,self.out_channels,kernel_size=3,padding=1,stride=2)
        self.bn4_1=nn.BatchNorm2d(self.out_channels)


        self.in_channels=self.out_channels
        self.out_channels=self.out_channels*2
        self.conv5_1=nn.Conv2d(self.in_channels,self.out_channels,kernel_size=3,padding=1,stride=2)
        self.bn5_1=nn.BatchNorm2d(self.out_channels)

    def forward(self,x):
       

        x=self.conv1(x)
        print(f"after conv1 channels: 64 kernel_size: 7 stride: 2 padding: 3 {x.shape}")
        x=self.bn1(x)
        x=self.relu(x)
        x=self.maxpool(x)
        print(f"after maxpool kernel_size: 3 stride: 2 padding: 1 {x.shape}")
        identity=x
        x=self.conv2_1(x)
        x=self.bn2_1(x)
        x=self.relu(x)
        x+=identity
        print(f"identity shape {identity.shape}")
        print(f"after conv2 channels: 64 kernel_size: 3 stride: 1 padding: 1 {x.shape}")


        identity=x
        x=self.conv3_1(x)
        x=self.bn3_1(x)
        x=self.relu(x)
        identity_downsample=nn.Sequential(
            nn.Conv2d(in_channels=64,out_channels=128,kernel_size=1,stride=2,padding=0),
            nn.BatchNorm2d(num_features=128)
        )
        x+=identity_downsample(identity)
        print(f"identity shape {identity.shape}")
        print(f"after conv3 channels: 128 kernel_size: 3 stride: 2 padding: 1 {x.shape}")

        
        
        identity=x
        x=self.conv4_1(x)
        x=self.bn4_1(x)
        x=self.relu(x)
        identity_downsample=nn.Sequential(
            nn.Conv2d(in_channels=128,out_channels=256,kernel_size=1,stride=2,padding=0),
            nn.BatchNorm2d(num_features=256)
        )
        x+=identity_downsample(identity)
        print(f"identity shape {identity.shape}")
        print(f"after conv4 channels: 256 kernel_size: 3 stride: 2 padding: 1 {x.shape}")
        

        identity=x
        x=self.conv5_1(x)
        x=self.bn5_1(x)
        x=self.relu(x)
        identity_downsample=nn.Sequential(
            nn.Conv2d(in_channels=256,out_channels=512,kernel_size=1,stride=2,padding=0),
            nn.BatchNorm2d(num_features=512)
        )
        x+=identity_downsample(identity)
        print(f"identity shape {identity.shape}")
        print(f"after conv4 channels: 256 kernel_size: 3 stride: 2 padding: 1 {x.shape}")
        
        x=self.avgpool(x)
        print(f"after avgpool {x.shape}")
        x=x.reshape(x.shape[0],-1)
        print(f"after reshape {x.shape}")

        x=self.fc(x)
        
        return x

    # def __make_layers(self,num_residual_block,out_channels,stride):
        
    #     layers=[]
    #     for i in range(num_residual_block):
    #         layers.append(nn.Conv2d(self.in_channels,out_channels,kernel_size=3,padding=2,stride=stride))
        
    #     return(nn.Sequential(*layers))

In [43]:
net=ResNet(3,4)
x=torch.randn(2,3,224,224)
y=net(x)
print(f"out shape {y.shape}")


after conv1 channels: 64 kernel_size: 7 stride: 2 padding: 3 torch.Size([2, 64, 112, 112])
after maxpool kernel_size: 3 stride: 2 padding: 1 torch.Size([2, 64, 56, 56])
identity shape torch.Size([2, 64, 56, 56])
after conv2 channels: 64 kernel_size: 3 stride: 1 padding: 1 torch.Size([2, 64, 56, 56])
identity shape torch.Size([2, 64, 56, 56])
after conv3 channels: 128 kernel_size: 3 stride: 2 padding: 1 torch.Size([2, 128, 28, 28])
identity shape torch.Size([2, 128, 28, 28])
after conv4 channels: 256 kernel_size: 3 stride: 2 padding: 1 torch.Size([2, 256, 14, 14])
identity shape torch.Size([2, 256, 14, 14])
after conv4 channels: 256 kernel_size: 3 stride: 2 padding: 1 torch.Size([2, 512, 7, 7])
after avgpool torch.Size([2, 512, 1, 1])
after reshape torch.Size([2, 512])
out shape torch.Size([2, 4])
