In [1]:
import pytorch_lightning as pl
import torch
from torch import nn
from torch.nn import functional as F
from torch.utils.data import random_split, DataLoader

from torchmetrics import Accuracy

from torchvision import transforms
from torchvision.datasets.utils import download_url
import torchvision.models as models

In [2]:
import lance
from lance.pytorch.data import LanceDataset
import pyarrow.compute as pc
from PIL import Image
from torchdata.datapipes.iter import IterableWrapper
import io

In [3]:
def collate_fn(transform):
    def _collate_fn(batch):        
        # TODO: convert label to int64 from Dataset?
        labels = batch[1].type(torch.LongTensor) - 1
        # TODO: Image conversion should in torch.LanceDataset
        images = [
            transform(img.convert("RGB")) for img in batch[0]
        ]
        return torch.stack(images), labels

    return _collate_fn

In [4]:
class OxfordPetDataModule(pl.LightningDataModule):
    def __init__(self, batch_size, uri):
        super().__init__()
        self.batch_size = batch_size
        self.uri = uri

        
        # Apply transformations to the train dataset
        self.augmentation = transforms.Compose([
            transforms.Resize(256),
            transforms.CenterCrop(224),
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])

        # Preprocessing steps applied to validation and test set.
        self.transform = transforms.Compose([
              transforms.Resize(size=256),
              transforms.CenterCrop(size=224),
              transforms.ToTensor(),
              transforms.Normalize([0.485, 0.456, 0.406],[0.229, 0.224, 0.225])
        ])
        
        self.num_classes = 37


    def prepare_data(self):
        pass


    def setup(self, stage=None):
        for split in ["train", "val", "test"]:
            setattr(self, split, LanceDataset(self.uri, columns=["image", "class"],
                                              mode="batch",
                                              batch_size=self.batch_size,
                                              filter=pc.field("split") == split))
        
    def train_dataloader(self):
        dp = IterableWrapper(self.train).shuffle()
        return DataLoader(dp, batch_size=None, num_workers=8, 
                          collate_fn=collate_fn(self.augmentation),
                          shuffle=True)


    def val_dataloader(self):
        return DataLoader(IterableWrapper(self.val), batch_size=None, 
                          num_workers=8,
                          collate_fn=collate_fn(self.transform))



    def test_dataloader(self):
        return DataLoader(IterableWrapper(self.test), batch_size=None, 
                          num_workers=8,
                          collate_fn=collate_fn(self.transform))


In [5]:
class PetModule(pl.LightningModule):
    def __init__(self, input_shape, num_classes, learning_rate=2e-4, transfer=False):
        super().__init__()
        
        # log hyperparameters
        self.save_hyperparameters()
        self.learning_rate = learning_rate
        self.dim = input_shape
        self.num_classes = num_classes
        
        # transfer learning if pretrained=True
        self.feature_extractor = models.resnet18(pretrained=transfer)

        if transfer:
            # layers are frozen by using eval()
            self.feature_extractor.eval()
            # freeze params
            for param in self.feature_extractor.parameters():
                param.requires_grad = False
        
        n_sizes = self._get_conv_output(input_shape)

        self.classifier = nn.Linear(n_sizes, num_classes)

        self.criterion = nn.CrossEntropyLoss()
        self.accuracy = Accuracy()
  
    # returns the size of the output tensor going into the Linear layer from the conv block.
    def _get_conv_output(self, shape):
        batch_size = 1
        tmp_input = torch.autograd.Variable(torch.rand(batch_size, *shape))

        output_feat = self._forward_features(tmp_input) 
        n_size = output_feat.data.view(batch_size, -1).size(1)
        return n_size
        
    # returns the feature tensor from the conv block
    def _forward_features(self, x):
        x = self.feature_extractor(x)
        return x
    
    # will be used during inference
    def forward(self, x):
        x = self._forward_features(x)
        x = x.view(x.size(0), -1)
        x = self.classifier(x)       
        return x
    
    def training_step(self, batch):
        batch, gt = batch[0], batch[1]
        out = self.forward(batch)
        loss = self.criterion(out, gt)

        acc = self.accuracy(out, gt)

        self.log("train/loss", loss)
        self.log("train/acc", acc)

        return loss
    
    def validation_step(self, batch, batch_idx):
        batch, gt = batch[0], batch[1]
        out = self.forward(batch)
        loss = self.criterion(out, gt)

        self.log("val/loss", loss)

        acc = self.accuracy(out, gt)
        self.log("val/acc", acc)

        return loss
    
    def test_step(self, batch, batch_idx):
        batch, gt = batch[0], batch[1]
        out = self.forward(batch)
        loss = self.criterion(out, gt)
        
        return {"loss": loss, "outputs": out, "gt": gt}
    
    def test_epoch_end(self, outputs):
        loss = torch.stack([x['loss'] for x in outputs]).mean()
        output = torch.cat([x['outputs'] for x in outputs], dim=0)
        
        gts = torch.cat([x['gt'] for x in outputs], dim=0)
        
        self.log("test/loss", loss)
        acc = self.accuracy(output, gts)
        self.log("test/acc", acc)
        
        self.test_gts = gts
        self.test_output = output
    
    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=self.learning_rate)

In [8]:
dm = OxfordPetDataModule(batch_size=64, uri="/home/ubuntu/datasets/lance/oxford_pet.lance/")
model = PetModule((3, 30, 300), 37, transfer=True)
trainer = pl.Trainer(max_epochs=10, accelerator="gpu")

trainer.fit(model, dm)
trainer.test(model, dm)

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name              | Type             | Params
-------------------------------------------------------
0 | feature_extractor | ResNet           | 11.7 M
1 | classifier        | Linear           | 37.0 K
2 | criterion         | CrossEntropyLoss | 0     
3 | accuracy          | Accuracy         | 0     
-------------------------------------------------------
37.0 K    Trainable params
11.7 M    Non-trainable params
11.7 M    Total params
46.906    Total estimated model params size (MB)


Sanity Checking: 0it [00:00, ?it/s]

Training: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

`Trainer.fit` stopped: `max_epochs=10` reached.
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]


Testing: 0it [00:00, ?it/s]

────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
       Test metric             DataLoader 0
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
        test/acc            0.4715181291103363
        test/loss           1.8243054151535034
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────


[{'test/loss': 1.8243054151535034, 'test/acc': 0.4715181291103363}]

In [9]:
trainer.save_checkpoint('/tmp/model.ckpt')

m = torch.jit.script(model)

torch.jit.save(m, '/tmp/model.pth')