# Benchmarking the performance of Pelican part2

## Dataset
[ImageNet](https://www.kaggle.com/c/imagenet-object-localization-challenge/overview)
Using this [script](https://raw.githubusercontent.com/soumith/imagenetloader.torch/master/valprep.sh) to prepare the data first. Then train it using ResNet50.

## Hardware
Google Colab T4 GPU with high RAM

In [None]:
!pip install torchdata
!pip install pelicanfs fsspec

Collecting torchdata
  Downloading torchdata-0.7.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (4.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.7/4.7 MB[0m [31m10.2 MB/s[0m eta [36m0:00:00[0m
Collecting nvidia-cuda-nvrtc-cu12==12.1.105 (from torch>=2->torchdata)
  Using cached nvidia_cuda_nvrtc_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (23.7 MB)
Collecting nvidia-cuda-runtime-cu12==12.1.105 (from torch>=2->torchdata)
  Using cached nvidia_cuda_runtime_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (823 kB)
Collecting nvidia-cuda-cupti-cu12==12.1.105 (from torch>=2->torchdata)
  Using cached nvidia_cuda_cupti_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (14.1 MB)
Collecting nvidia-cudnn-cu12==8.9.2.26 (from torch>=2->torchdata)
  Using cached nvidia_cudnn_cu12-8.9.2.26-py3-none-manylinux1_x86_64.whl (731.7 MB)
Collecting nvidia-cublas-cu12==12.1.3.1 (from torch>=2->torchdata)
  Using cached nvidia_cublas_cu12-12.1.3.1-py3-none-manylinux1_x8

In [None]:
import torch
print(torch.cuda.is_available())


True


In [None]:
import os
import torch
import torchvision.transforms as transforms
import torchvision.models as models
from torch.utils.data import Dataset, DataLoader
from torch import nn, optim
import fsspec
from pelicanfs.core import PelicanFileSystem
import time
from PIL import Image
from fsspec.implementations.local import LocalFileSystem
from torchvision.datasets import VisionDataset
from torchvision.io import read_image
from PIL import Image
import matplotlib.pyplot as plt

class RemoteImageFolder(VisionDataset):

    def __init__(self, root,transform=None, target_transform=None):
        super().__init__(root, transform=transform, target_transform=target_transform)
        if os.path.isdir(root):
            self._init_local(root)
        else:
            self._init_remote(root)

    def _init_local(self, root):
        print(f"Initializing local dataset from {root}")
        self.root = root
        self.fs = LocalFileSystem()
        self.classes = sorted(os.listdir(root))
        self.class_to_idx = {cls: i for i, cls in enumerate(self.classes)}
        self.imgs = self._make_dataset_local()

    def _init_remote(self, root, transform=None):
        self.root = root
        self.fs = PelicanFileSystem("pelican://osg-htc.org")

        self.classes = sorted([item['name'] for item in self.fs.ls(root)])
        self.class_to_idx = {cls: i for i, cls in enumerate(self.classes)}

        self.imgs = self._make_dataset_remote()

    def _make_dataset_local(self):
        images = []
        for class_idx, cls_name in enumerate(self.classes):
            class_path = os.path.join(self.root, cls_name)
            if not os.path.isdir(class_path):
                continue
            for img_name in os.listdir(class_path):
                img_name = img_name.lower()
                if img_name.endswith('.jpg') or img_name.endswith('.jpeg') or img_name.endswith('.png'):
                    img_path = os.path.join(class_path, img_name)
                    images.append((img_path, class_idx))
        return images

    def _make_dataset_remote(self):
        images = []
        for class_idx, cls_name in enumerate(self.classes):
            class_path = os.path.join(self.root, cls_name)
            files = self.fs.ls(class_path)
            for item in files:
                img_path = item['name']
                if img_path.lower().endswith('.jpg') or img_path.lower().endswith('.jpeg') or img_path.lower().endswith('.png'):
                    images.append((img_path, class_idx))
        print("len(images): ", len(images))
        return images

    def __getitem__(self, index):
        img_path, target = self.imgs[index]
        if isinstance(self.fs, PelicanFileSystem):
            with self.fs.open(img_path, 'rb') as f:
                img = Image.open(f).convert('RGB')
        else:
            img = read_image(img_path)
            img = transforms.ToPILImage()(img)

        if img.mode != 'RGB':
            img = img.convert('RGB')

        if self.transform:
            img = self.transform(img)

        return img, target

    def __len__(self):
        return len(self.imgs)


In [None]:
import torch.nn as nn
import torch.optim as optim
from torchvision import models
import torch.multiprocessing as mp
import time

train_transforms = transforms.Compose([
    transforms.RandomResizedCrop(224),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),  # Ensure ToTensor is included
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

val_transforms = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),  # Ensure ToTensor is included
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Define the Pelican paths
trainfile_path = "/chtc/PUBLIC/hzhao292/ILSVRC/Data/CLS-LOC/train"
valfile_path = "/chtc/PUBLIC/hzhao292/ILSVRC/Data/CLS-LOC/val"

dev_trainfile_path = "/chtc/PUBLIC/hzhao292/ImageNetMini/train"
dev_valfile_path = "/chtc/PUBLIC/hzhao292/ImageNetMini/val"

# Load the datasets
train_dataset = RemoteImageFolder(root=dev_trainfile_path, transform=train_transforms)
val_dataset = RemoteImageFolder(root=dev_valfile_path, transform=val_transforms)

# Create the dataloaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=0, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=0, pin_memory=True)


len(images):  9469
len(images):  3925


In [6]:

# Initialize the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Initialize the model, loss function, and optimizer
model = models.resnet50(pretrained=True)
model.fc = nn.Linear(model.fc.in_features, len(train_dataset.classes))
model = model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training and validation loop
num_epochs = 10

print("Training started.")
for epoch in range(num_epochs):
    print("Epoch", epoch+1, "started.")
    start_time = time.time()

    # Training phase
    model.train()
    running_loss = 0.0
    i=0
    for inputs, labels in train_loader:
        i+=1
        if i%10==0:
            print("Batch ", i)
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * inputs.size(0)

    epoch_loss = running_loss / len(train_loader.dataset)
    print(f"Epoch {epoch+1}/{num_epochs}, Training Loss: {epoch_loss:.4f}")

    # Validation phase
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            running_loss += loss.item() * inputs.size(0)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    epoch_loss = running_loss / len(val_loader.dataset)
    accuracy = correct / total
    end_time = time.time()
    time_taken = end_time - start_time
    print(f"Epoch {epoch+1}/{num_epochs}, Validation Loss: {epoch_loss:.4f}, Accuracy: {accuracy:.4f}, Time Taken: {time_taken:.2f} seconds")

print("Training completed.")

Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to /root/.cache/torch/hub/checkpoints/resnet50-0676ba61.pth
100%|██████████| 97.8M/97.8M [00:01<00:00, 51.7MB/s]


Training started.
Epoch 1 started.
Batch  10
Batch  20
Batch  30
Batch  40
Batch  50
Batch  60
Batch  70
Batch  80
Batch  90
Batch  100
Batch  110
Batch  120
Batch  130
Batch  140
Batch  150
Batch  160
Batch  170
Batch  180
Batch  190
Batch  200
Batch  210
Batch  220
Batch  230
Batch  240
Batch  250
Batch  260
Batch  270
Batch  280
Batch  290


  return Variable._execution_engine.run_backward(  # Calls into the C++ engine to run the backward pass


Epoch 1/10, Training Loss: 1.1407
Epoch 1/10, Validation Loss: 0.6926, Accuracy: 0.7809, Time Taken: 1138.80 seconds
Epoch 2 started.
Batch  10
Batch  20
Batch  30
Batch  40
Batch  50
Batch  60
Batch  70
Batch  80
Batch  90
Batch  100
Batch  110
Batch  120
Batch  130
Batch  140
Batch  150
Batch  160
Batch  170
Batch  180
Batch  190
Batch  200
Batch  210
Batch  220
Batch  230
Batch  240
Batch  250
Batch  260
Batch  270
Batch  280
Batch  290
Epoch 2/10, Training Loss: 0.8152
Epoch 2/10, Validation Loss: 0.6254, Accuracy: 0.8020, Time Taken: 1113.87 seconds
Epoch 3 started.
Batch  10
Batch  20
Batch  30
Batch  40
Batch  50
Batch  60
Batch  70
Batch  80
Batch  90
Batch  100
Batch  110
Batch  120
Batch  130
Batch  140
Batch  150
Batch  160
Batch  170
Batch  180
Batch  190
Batch  200
Batch  210
Batch  220
Batch  230
Batch  240
Batch  250
Batch  260
Batch  270
Batch  280
Batch  290
Epoch 3/10, Training Loss: 0.7164
Epoch 3/10, Validation Loss: 0.4831, Accuracy: 0.8522, Time Taken: 1074.37 sec

In [None]:
torch.save(model.state_dict(), 'resnet50_imagenet.pth')


## Using DataPipe

In [None]:
# Streaming version
import fsspec
from pelicanfs.core import PelicanFileSystem, PelicanMap, OSDFFileSystem
zipfilepath = "pelican://osg-htc.org/chtc/PUBLIC/hzhao292/imagenet-object-localization-challenge.zip"
trainfile_path = "/chtc/PUBLIC/hzhao292/ILSVRC/Data/CLS-LOC/train"
valfile_path = "pelican://osg-htc.org/chtc/PUBLIC/hzhao292/ILSVRC/Data/CLS-LOC/val"
fs = fsspec.filesystem('pelican')
pfs = PelicanFileSystem('pelican://osg-htc.org/')

# dp2 = IterableWrapper([zipfilepath])  \
#         .open_files_by_fsspec(mode="rb") \
#         .load_from_zip()
# for path, filestream in dp2:
#     print(path, filestream)
#     break

In [None]:
import fsspec
import torch
torch.utils.data.datapipes.utils.common.DILL_AVAILABLE = torch.utils._import_utils.dill_available()
# Need add this line if run in google colab, or it will cause error
# torch.utils.data.datapipes.utils.common.DILL_AVAILABLE = torch.utils._import_utils.dill_available()
from torchdata.datapipes.iter import IterableWrapper, FileOpener
from torch.utils.data import DataLoader
import torchvision.transforms as transforms
from torchvision.io import read_image
from torchdata.datapipes.iter import IterDataPipe
from torchdata import dataloader2 as DataLoader2

class RemoteImageDataPipe(IterDataPipe):
    def __init__(self, root, transform=None):
        self.root = root
        self.transform = transform
        self.fs = fsspec.filesystem('pelican')
        self.files = self.fs.ls(root)

    def __iter__(self):
        for file in self.files:
            with self.fs.open(file, 'rb') as f:
                img = read_image(f).convert('RGB')
                if self.transform:
                    img = self.transform(img)
                yield img

# Define transformations for training and validation
train_transforms = transforms.Compose([
    transforms.RandomResizedCrop(224),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

val_transforms = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
/Users/a/Documents/2024summerintern/IntegratePelicanwithPytorch/benchmark/Benchmark2.ipynb
# Define the S3 paths
trainfile_path = "/chtc/PUBLIC/hzhao292/ILSVRC/Data/CLS-LOC/train"
valfile_path = "/chtc/PUBLIC/hzhao292/ILSVRC/Data/CLS-LOC/val"

# Create DataPipes for training and validation datasets
train_datapipe = RemoteImageDataPipe(root=trainfile_path, transform=train_transforms)
val_datapipe = RemoteImageDataPipe(root=valfile_path, transform=val_transforms)
print(type(train_datapipe))

# Create DataLoaders
train_loader = DataLoader2(train_datapipe, batch_size=32, shuffle=True, num_workers=4, pin_memory=True)
val_loader = DataLoader2(val_datapipe, batch_size=32, shuffle=False, num_workers=4, pin_memory=True)

for i in train_loader:
    print(i)
    break

<class '__main__.RemoteImageDataPipe'>


TypeError: 'module' object is not callable