# Pytorch Data Loader
#####  $\hspace{350pt}$  anvekartejas@gmail.com

## Loading Data with Structured Folders


 $\textbf{Dataset}\\ \hspace{20pt}{|}  \text{class1} --- \text{class1_0.png}\\  \hspace{72pt} \text{class1_1.png}\\
 \\  \hspace{20pt}{|} \text{class2} --- \text{class2_0.png}\\  \hspace{72pt} \text{class2_1.png}$


In [1]:
#imports
import torch
import os
import cv2
import numpy as np
from torch.utils.data import DataLoader,Dataset
from torchvision import transforms

In [2]:
#creating class for custom dataloader

class customloader(Dataset):   ##inherited custom dataset class
    def __init__(self,root,transforms=None):
        super(customloader,self).__init__()   ## overwrite args
        
        self.root = root
        self.transforms = transforms
        
        
        self.folders_path = [os.path.join(self.root,x) for x in os.listdir(self.root)]  ##list comprehension for storing root-->folder path in a list
        self.folders_path.sort()     ## only for linux
        
        
        self.image_path = []
        self.labels = []
        
        for i in range(len(self.folders_path)):     ## loop through all folders
            x = os.listdir(self.folders_path[i])   
            for j in range(len(x)):                 ## loop through all images in each sub folders 
                k = os.path.join(self.folders_path[i],x[j])    #store image path 
                self.image_path.append(k)
                self.labels.append(int(i))                    # store labels
           
        self.image_path.sort()    ## only for linux
        self.labels.sort()        ## only for linux

        
    def __len__(self):
        return len(self.image_path)    ##over write len method to give iterable len of this obj
    
    def __getitem__(self,index):
        assert len(self.image_path) == len(self.labels),"labels size doesnt match with number of images"
        
        image = cv2.imread(self.image_path[index])   ## read img of specific image
        label = np.array(self.labels[index])        ## read label of specific label
        
        image = np.array(image)


        
        if self.transforms is not None:
            image = self.transforms(image)     ## apply custom tranformations
        
        
        return (image,label)   ## return tuble of image and label whenever iterated
        

In [3]:
## unit test block


data = customloader(root="/home/tejas/STD_DATESETS/Cifar-10/Train/Original",transforms = transforms.ToTensor())
print("total images: ",len(data))

loader = DataLoader(data,batch_size=16,shuffle=True,num_workers=6)

for idx ,(x,y) in  enumerate(loader):
    print(x.shape,y.shape,y)
    break


total images:  50000
torch.Size([16, 3, 32, 32]) torch.Size([16]) tensor([7, 5, 9, 4, 2, 1, 9, 0, 5, 0, 8, 9, 0, 1, 2, 1])


## Loading Data with paths and labels in csv file

In [4]:
import torch
import pandas as pd
import cv2
import numpy as np
from torch.utils.data import DataLoader,Dataset
from torchvision import transforms

In [5]:
## customdata loader from csv

class customcsv(Dataset):
    def __init__(self,csv_path,transforms=None):
        super(customcsv,self).__init__()
        
        self.csv = pd.read_csv(csv_path)      ## read csv
        self.transforms = transforms
        
    def __len__(self):
        return len(self.csv)+1
    
    
    def __getitem__(self,index):
        
        image = cv2.imread(self.csv.iloc[index,0])   ## read img of specific image "image path are in column 0"
        label = np.array(self.csv.iloc[index,1])        ## read label of specific label
        
        image = np.array(image)


        
        if self.transforms is not None:
            image = self.transforms(image)     ## apply custom tranformations
        
        
        return (image,label)   ## return tuble of image and label whenever iterated
    
    

In [6]:
## unit test block


data = customcsv(csv_path="/home/tejas/Pytorch_tutorial/data.csv",transforms = transforms.ToTensor())
print("total images: ",len(data))

loader = DataLoader(data,batch_size=16,shuffle=True,num_workers=6)

for idx ,(x,y) in  enumerate(loader):
    print(x.shape,y.shape,y)
    break

total images:  50000
torch.Size([16, 3, 32, 32]) torch.Size([16]) tensor([3, 5, 8, 7, 9, 4, 4, 1, 9, 7, 7, 8, 8, 1, 9, 7])


# Building ANN Model

In [7]:
#imports
import torch
import torch.nn as nn
import torch.nn.functional as F

In [8]:
class ANN(nn.Module):
    def __init__(self,inp,num_classes=10,layers=[1000,500,200,100]):
        super().__init__()
        self.FC = nn.ModuleList()   ##Model as list initialization
        
        for layer in layers:
            self.FC.append(
                nn.Sequential(
                    nn.Linear(inp,layer),    ## linear layer
                    nn.ReLU(inplace=False),               ## Retified Linaer unit activation
                    nn.Dropout(0.2,inplace=False)          ## 1D drop out with 20% prob
                
                )
            )
            
            inp=layer
            
        self.last_embedding = nn.Sequential(nn.Linear(layers[-1],num_classes),nn.Softmax(dim=1))  ## last imbedding layer
        
    def forward(self,x):                  ## function that computes tensor when sent through
        for l in self.FC:
            x = l(x)                     ## unpack module list and start dynamic graph as per args
            
        return self.last_embedding(x)

In [9]:
#unit testing
def testANN():
    inp = torch.randn((1,784)).cuda()
    model = ANN(inp=784,num_classes=5,layers=[2000,1000,500,200]).cuda()
    out = model(inp)
    
    print(model)
    print("\n",inp.shape,out.shape)
    
testANN()

ANN(
  (FC): ModuleList(
    (0): Sequential(
      (0): Linear(in_features=784, out_features=2000, bias=True)
      (1): ReLU()
      (2): Dropout(p=0.2, inplace=False)
    )
    (1): Sequential(
      (0): Linear(in_features=2000, out_features=1000, bias=True)
      (1): ReLU()
      (2): Dropout(p=0.2, inplace=False)
    )
    (2): Sequential(
      (0): Linear(in_features=1000, out_features=500, bias=True)
      (1): ReLU()
      (2): Dropout(p=0.2, inplace=False)
    )
    (3): Sequential(
      (0): Linear(in_features=500, out_features=200, bias=True)
      (1): ReLU()
      (2): Dropout(p=0.2, inplace=False)
    )
  )
  (last_embedding): Sequential(
    (0): Linear(in_features=200, out_features=5, bias=True)
    (1): Softmax(dim=1)
  )
)

 torch.Size([1, 784]) torch.Size([1, 5])


## Loss Function,optimizers and hyperameters

In [10]:
model = ANN(inp=32*32*3,num_classes=10,layers=[2000,1000,500,200]).cuda()
optimizer = torch.optim.Adam(model.parameters(),lr=2e-4,betas=(0.5,0.99))

## Data Loader 

In [11]:

train_data = customloader(root="/home/tejas/Pytorch_tutorial/cifar10_example/train",transforms = transforms.ToTensor())
test_data = customloader(root="/home/tejas/Pytorch_tutorial/cifar10_example/test",transforms = transforms.ToTensor())







train_loader = DataLoader(train_data,shuffle=True,batch_size=8,num_workers=2)
test_loader = DataLoader(test_data,shuffle=False,batch_size=10000,num_workers=2)

## Training and testing loops

In [12]:
def train(model, device, train_loader, optimizer, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        
        optimizer.zero_grad()
        data = data.reshape(data.shape[0],-1)
        output = model(data)
        loss = F.cross_entropy(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % 10 == 0:
            print(
                "Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}".format(
                    epoch,
                    batch_idx * len(data),
                    len(train_loader.dataset),
                    100.0 * batch_idx / len(train_loader),
                    loss.item(),
                )
            )


def test(model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data = data.reshape(data.shape[0],-1)
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += F.cross_entropy(
                output, target, reduction="sum"
            ).item()  # sum up batch loss
            pred = output.argmax(
                dim=1, keepdim=True
            )  # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)

    print(
        "\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n".format(
            test_loss,
            correct,
            len(test_loader.dataset),
            100.0 * correct / len(test_loader.dataset),
        )
    )

In [13]:
use_cuda =  torch.cuda.is_available()

torch.manual_seed(0)

device = torch.device("cuda" if use_cuda else "cpu")
epochs = 10
for epoch in range(epochs):
    train(model,device,train_loader,optimizer,epoch)
    test(model,device,test_loader)


Test set: Average loss: 2.3021, Accuracy: 12/100 (12%)


Test set: Average loss: 2.3014, Accuracy: 16/100 (16%)


Test set: Average loss: 2.3008, Accuracy: 16/100 (16%)


Test set: Average loss: 2.2993, Accuracy: 12/100 (12%)


Test set: Average loss: 2.2968, Accuracy: 10/100 (10%)


Test set: Average loss: 2.2933, Accuracy: 10/100 (10%)


Test set: Average loss: 2.2868, Accuracy: 13/100 (13%)


Test set: Average loss: 2.2778, Accuracy: 12/100 (12%)


Test set: Average loss: 2.2762, Accuracy: 13/100 (13%)


Test set: Average loss: 2.2718, Accuracy: 14/100 (14%)

