In [2]:
import os
import numpy as np
import random
import torch
import torch.nn as nn
import torch.nn.functional as F
from filelock import FileLock
from torch.utils.data import DataLoader, random_split, Subset
from torchmetrics import Accuracy
from torchvision.datasets import MNIST
from torchvision import transforms

import pytorch_lightning as pl
from pytorch_lightning import trainer
from pytorch_lightning.core import datamodule
from pytorch_lightning.loggers.csv_logs import CSVLogger

In [3]:
class MNISTDataModule(pl.LightningDataModule):
    def __init__(self, batch_size=100):
        super().__init__()
        self.data_dir = os.getcwd()
        self.batch_size = batch_size
        self.transform = transforms.Compose(
            [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
        )

    def setup(self, stage=None):
        with FileLock(f"{self.data_dir}.lock"):
            mnist = MNIST(
                self.data_dir, train=True, download=True, transform=self.transform
            )

            # split data into train and val sets
            self.mnist_train, self.mnist_val = random_split(mnist, [55000, 5000])

    def train_dataloader(self):
        return DataLoader(self.mnist_train, batch_size=self.batch_size, num_workers=4)

    def val_dataloader(self):
        return DataLoader(self.mnist_val, batch_size=self.batch_size, num_workers=4)

    def test_dataloader(self):
        with FileLock(f"{self.data_dir}.lock"):
            self.mnist_test = MNIST(
                self.data_dir, train=False, download=True, transform=self.transform
            )
        return DataLoader(self.mnist_test, batch_size=self.batch_size, num_workers=4)


datamodule = MNISTDataModule(batch_size=128)

In [4]:
class MNISTClassifier(pl.LightningModule):
    def __init__(self, lr=1e-3, feature_dim=128):
        torch.manual_seed(421)
        super(MNISTClassifier, self).__init__()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(28 * 28, feature_dim),
            nn.ReLU(),
            nn.Linear(feature_dim, 10),
            nn.ReLU(),
        )
        self.lr = lr
        self.accuracy = Accuracy(task="multiclass", num_classes=10)
        self.eval_loss = []
        self.eval_accuracy = []

    def forward(self, x):
        x = x.view(-1, 28 * 28)
        x = self.linear_relu_stack(x)
        return x

    def training_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self(x)
        loss = torch.nn.functional.cross_entropy(y_hat, y)
        self.log("train_loss", loss)
        return loss

    def validation_step(self, val_batch, batch_idx):
        loss, acc = self._shared_eval(val_batch)
        self.log("val_accuracy", acc)
        self.eval_loss.append(loss)
        self.eval_accuracy.append(acc)
        return {"val_loss": loss, "val_accuracy": acc}

    def test_step(self, test_batch, batch_idx):
        loss, acc = self._shared_eval(test_batch)
        self.log("test_accuracy", acc)
        return {"test_loss": loss, "test_accuracy": acc}

    def _shared_eval(self, batch):
        x, y = batch
        logits = self.forward(x)
        loss = F.nll_loss(logits, y)
        acc = self.accuracy(logits, y)
        return loss, acc

    def on_validation_epoch_end(self):
        avg_loss = torch.stack(self.eval_loss).mean()
        avg_acc = torch.stack(self.eval_accuracy).mean()
        self.log("val_loss", avg_loss, sync_dist=True)
        self.log("val_accuracy", avg_acc, sync_dist=True)
        self.eval_loss.clear()
        self.eval_accuracy.clear()

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=self.lr)
        return optimizer

In [5]:
from pytorch_lightning.callbacks import ModelCheckpoint
from ray.air.config import RunConfig, ScalingConfig, CheckpointConfig
from ray.train.lightning import (
    LightningTrainer,
    LightningConfigBuilder,
    LightningCheckpoint,
)


def build_lightning_config_from_existing_code(use_gpu):
    # Create a config builder to encapsulate all required parameters.
    # Note that model instantiation and fitting will occur later in the LightingTrainer,
    # rather than in the config builder.
    config_builder = LightningConfigBuilder()

    # 1. define your model
    # model = MNISTClassifier(lr=1e-3, feature_dim=128)
    config_builder.module(cls=MNISTClassifier, lr=1e-3, feature_dim=128)

    # 2. define a ModelCheckpoint callback
    # checkpoint_callback = ModelCheckpoint(
    #     monitor="val_accuracy", mode="max", save_top_k=3
    # )
    config_builder.checkpointing(monitor="val_accuracy", mode="max", save_top_k=3)

    # 3. Define a Lightning trainer
    # trainer = pl.Trainer(
    #     max_epochs=10,
    #     accelerator="cpu",
    #     strategy="ddp",
    #     log_every_n_steps=100,
    #     logger=CSVLogger("logs"),
    #     callbacks=[checkpoint_callback],
    # )
    config_builder.trainer(
        max_epochs=10,
        accelerator="gpu" if use_gpu else "cpu",
        log_every_n_steps=100,
        logger=CSVLogger("logs"),
    )
    # You do not need to provide the checkpoint callback and strategy here,
    # since LightningTrainer configures them automatically.
    # You can also add any other callbacks into LightningConfigBuilder.trainer().

    # 4. Parameters for model fitting
    # trainer.fit(model, datamodule=datamodule)
    config_builder.fit_params(datamodule=datamodule)

    # Finally, compile all the configs into a dictionary for LightningTrainer
    lightning_config = config_builder.build()
    return lightning_config

In [12]:
# Set it to False if you want to run without GPUs
use_gpu = False

In [13]:
lightning_config = build_lightning_config_from_existing_code(use_gpu=use_gpu)

scaling_config = ScalingConfig(num_workers=4, use_gpu=use_gpu)

run_config = RunConfig(
    name="ptl-mnist-example",
    checkpoint_config=CheckpointConfig(
        num_to_keep=3,
        checkpoint_score_attribute="val_accuracy",
        checkpoint_score_order="max",
    ),
)

trainer = LightningTrainer(
    lightning_config=lightning_config,
    scaling_config=scaling_config,
    run_config=run_config,
)

In [14]:
result = trainer.fit()
print("Validation Accuracy: ", result.metrics["val_accuracy"])
result

0,1
Current time:,2023-06-15 22:17:55
Running for:,00:00:41.98
Memory:,19.3/62.4 GiB

Trial name,status,loc,iter,total time (s),train_loss,val_accuracy,val_loss
LightningTrainer_5308e_00000,TERMINATED,192.168.3.51:947988,10,35.0375,0.0855576,0.970121,-12.5655


2023-06-15 22:17:13,677	INFO data_parallel_trainer.py:357 -- GPUs are detected in your Ray cluster, but GPU training is not enabled for this trainer. To enable GPU training, make sure to set `use_gpu` to True in your scaling config.
[2m[36m(TrainTrainable pid=947988)[0m 2023-06-15 22:17:18,292	INFO data_parallel_trainer.py:357 -- GPUs are detected in your Ray cluster, but GPU training is not enabled for this trainer. To enable GPU training, make sure to set `use_gpu` to True in your scaling config.
[2m[36m(LightningTrainer pid=947988)[0m 2023-06-15 22:17:18,300	INFO data_parallel_trainer.py:357 -- GPUs are detected in your Ray cluster, but GPU training is not enabled for this trainer. To enable GPU training, make sure to set `use_gpu` to True in your scaling config.
[2m[36m(RayTrainWorker pid=948064)[0m 2023-06-15 22:17:21,145	INFO config.py:86 -- Setting up process group for: env:// [rank=0, world_size=4]
[2m[36m(RayTrainWorker pid=948064)[0m GPU available: False, used: Fa

[2m[36m(RayTrainWorker pid=948066)[0m Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
[2m[36m(RayTrainWorker pid=948066)[0m Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz to /udata/workspace/github-aican/ai-platform/content/ray-experimental/MNIST/raw/train-images-idx3-ubyte.gz


  0%|          | 0/9912422 [00:00<?, ?it/s]
  1%|          | 65536/9912422 [00:00<00:32, 307691.34it/s]
  1%|▏         | 131072/9912422 [00:00<00:31, 308784.40it/s]
  2%|▏         | 229376/9912422 [00:00<00:21, 444595.19it/s]
  3%|▎         | 327680/9912422 [00:00<00:17, 556664.47it/s]
  5%|▍         | 458752/9912422 [00:00<00:12, 752219.46it/s]
  7%|▋         | 688128/9912422 [00:00<00:08, 1147171.22it/s]
 10%|▉         | 983040/9912422 [00:01<00:05, 1631380.41it/s]
 14%|█▍        | 1409024/9912422 [00:01<00:03, 2307035.58it/s]
 17%|█▋        | 1671168/9912422 [00:01<00:04, 1683435.17it/s]
 33%|███▎      | 3244032/9912422 [00:01<00:01, 4047062.72it/s]
 41%|████      | 4063232/9912422 [00:01<00:01, 4120803.20it/s]
 45%|████▌     | 4489216/9912422 [00:02<00:01, 3074843.85it/s]
 58%|█████▊    | 5701632/9912422 [00:02<00:01, 3754025.08it/s]
 61%|██████▏   | 6094848/9912422 [00:02<00:01, 2072572.41it/s]
 76%|███████▌  | 7503872/9912422 [00:03<00:00, 3162614.02it/s]
 85%|████████▍ | 8388608

[2m[36m(RayTrainWorker pid=948066)[0m Extracting /udata/workspace/github-aican/ai-platform/content/ray-experimental/MNIST/raw/train-images-idx3-ubyte.gz to /udata/workspace/github-aican/ai-platform/content/ray-experimental/MNIST/raw
[2m[36m(RayTrainWorker pid=948066)[0m 
[2m[36m(RayTrainWorker pid=948066)[0m Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
[2m[36m(RayTrainWorker pid=948066)[0m Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz to /udata/workspace/github-aican/ai-platform/content/ray-experimental/MNIST/raw/train-labels-idx1-ubyte.gz


100%|██████████| 28881/28881 [00:00<00:00, 5993256.18it/s]


[2m[36m(RayTrainWorker pid=948066)[0m Extracting /udata/workspace/github-aican/ai-platform/content/ray-experimental/MNIST/raw/train-labels-idx1-ubyte.gz to /udata/workspace/github-aican/ai-platform/content/ray-experimental/MNIST/raw
[2m[36m(RayTrainWorker pid=948066)[0m 
[2m[36m(RayTrainWorker pid=948066)[0m Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
[2m[36m(RayTrainWorker pid=948066)[0m Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz to /udata/workspace/github-aican/ai-platform/content/ray-experimental/MNIST/raw/t10k-images-idx3-ubyte.gz


  0%|          | 0/1648877 [00:00<?, ?it/s]
  4%|▍         | 65536/1648877 [00:00<00:05, 300116.83it/s]
  8%|▊         | 131072/1648877 [00:00<00:04, 305341.45it/s]
 14%|█▍        | 229376/1648877 [00:00<00:03, 447413.77it/s]
 20%|█▉        | 327680/1648877 [00:00<00:02, 569585.39it/s]
 30%|██▉       | 491520/1648877 [00:00<00:01, 826915.98it/s]
 44%|████▎     | 720896/1648877 [00:00<00:00, 1220933.12it/s]
 60%|█████▉    | 983040/1648877 [00:01<00:00, 1593141.76it/s]
100%|██████████| 1648877/1648877 [00:01<00:00, 1454161.42it/s]


[2m[36m(RayTrainWorker pid=948066)[0m Extracting /udata/workspace/github-aican/ai-platform/content/ray-experimental/MNIST/raw/t10k-images-idx3-ubyte.gz to /udata/workspace/github-aican/ai-platform/content/ray-experimental/MNIST/raw
[2m[36m(RayTrainWorker pid=948066)[0m 
[2m[36m(RayTrainWorker pid=948066)[0m Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
[2m[36m(RayTrainWorker pid=948066)[0m Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz to /udata/workspace/github-aican/ai-platform/content/ray-experimental/MNIST/raw/t10k-labels-idx1-ubyte.gz


100%|██████████| 4542/4542 [00:00<00:00, 48107395.88it/s]
[2m[36m(RayTrainWorker pid=948066)[0m Missing logger folder: logs/lightning_logs


[2m[36m(RayTrainWorker pid=948066)[0m Extracting /udata/workspace/github-aican/ai-platform/content/ray-experimental/MNIST/raw/t10k-labels-idx1-ubyte.gz to /udata/workspace/github-aican/ai-platform/content/ray-experimental/MNIST/raw
[2m[36m(RayTrainWorker pid=948066)[0m 


[2m[36m(RayTrainWorker pid=948064)[0m   | Name              | Type               | Params
[2m[36m(RayTrainWorker pid=948064)[0m ---------------------------------------------------------
[2m[36m(RayTrainWorker pid=948064)[0m 0 | linear_relu_stack | Sequential         | 101 K 
[2m[36m(RayTrainWorker pid=948064)[0m 1 | accuracy          | MulticlassAccuracy | 0     
[2m[36m(RayTrainWorker pid=948064)[0m ---------------------------------------------------------
[2m[36m(RayTrainWorker pid=948064)[0m 101 K     Trainable params
[2m[36m(RayTrainWorker pid=948064)[0m 0         Non-trainable params
[2m[36m(RayTrainWorker pid=948064)[0m 101 K     Total params
[2m[36m(RayTrainWorker pid=948064)[0m 0.407     Total estimated model params size (MB)


Trial name,_report_on,date,done,epoch,experiment_tag,hostname,iterations_since_restore,node_ip,pid,should_checkpoint,step,time_since_restore,time_this_iter_s,time_total_s,timestamp,train_loss,training_iteration,trial_id,val_accuracy,val_loss
LightningTrainer_5308e_00000,train_epoch_end,2023-06-15_22-17-53,True,9,0,mds-hp,10,192.168.3.51,947988,True,1080,35.0375,2.03801,35.0375,1686838673,0.0855576,10,5308e_00000,0.970121,-12.5655


[2m[36m(RayTrainWorker pid=948064)[0m `Trainer.fit` stopped: `max_epochs=10` reached.
[2m[36m(RayTrainWorker pid=948067)[0m Missing logger folder: logs/lightning_logs[32m [repeated 3x across cluster] (Ray deduplicates logs by default. Set RAY_DEDUP_LOGS=0 to disable log deduplication, or see https://docs.ray.io/en/master/ray-observability/ray-logging.html#log-deduplication for more options.)[0m
2023-06-15 22:17:55,655	INFO tune.py:945 -- Total run time: 41.99 seconds (41.98 seconds for the tuning loop).


Validation Accuracy:  0.9701211452484131


Result(
  metrics={'_report_on': 'train_epoch_end', 'train_loss': 0.08555756509304047, 'val_accuracy': 0.9701211452484131, 'val_loss': -12.565531730651855, 'epoch': 9, 'step': 1080, 'should_checkpoint': True, 'done': True, 'trial_id': '5308e_00000', 'experiment_tag': '0'},
  path='/home/modongsong/ray_results/ptl-mnist-example/LightningTrainer_5308e_00000_0_2023-06-15_22-17-13',
  checkpoint=LightningCheckpoint(local_path=/home/modongsong/ray_results/ptl-mnist-example/LightningTrainer_5308e_00000_0_2023-06-15_22-17-13/checkpoint_000009)
)

In [15]:
checkpoint: LightningCheckpoint = result.checkpoint
best_model: pl.LightningModule = checkpoint.get_model(MNISTClassifier)
trainer = pl.Trainer()
test_dataloader = datamodule.test_dataloader()
result = trainer.test(best_model, dataloaders=test_dataloader)

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
Missing logger folder: /udata/workspace/github-aican/ai-platform/content/ray-experimental/lightning_logs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
2023-06-15 22:17:57.721926: I tensorflow/core/util/port.cc:110] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2023-06-15 22:17:57.987913: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


Testing: 0it [00:00, ?it/s]

In [16]:
from ray.train.lightning import LightningPredictor

predictor = LightningPredictor.from_checkpoint(
    checkpoint, MNISTClassifier, use_gpu=use_gpu
)


def accuracy(logits, labels):
    preds = np.argmax(logits, axis=1)
    correct_preds = np.sum(preds == labels)
    return correct_preds


corrects = total = 0
for batch in test_dataloader:
    inputs, labels = batch
    inputs, labels = inputs.numpy(), labels.numpy()
    logits = predictor.predict(inputs)["predictions"]
    total += labels.size
    corrects += accuracy(logits, labels)

print("Accuracy: ", corrects / total)



Accuracy:  0.974
