这个例子是关于如何使用Ray Tune在Ray Serve上实现CL机制。

---



In [None]:
!nvidia-smi

Wed Jan 12 12:15:56 2022       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 495.46       Driver Version: 460.32.03    CUDA Version: 11.2     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  Tesla K80           Off  | 00000000:00:04.0 Off |                    0 |
| N/A   39C    P8    28W / 149W |      0MiB / 11441MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Proces

In [None]:
!pip install torch
!pip install ray
!pip install ray[tune]

Collecting ray
  Downloading ray-1.9.2-cp37-cp37m-manylinux2014_x86_64.whl (57.6 MB)
[K     |████████████████████████████████| 57.6 MB 1.2 MB/s 
Collecting redis>=3.5.0
  Downloading redis-4.1.0-py3-none-any.whl (171 kB)
[K     |████████████████████████████████| 171 kB 53.8 MB/s 
Collecting deprecated>=1.2.3
  Downloading Deprecated-1.2.13-py2.py3-none-any.whl (9.6 kB)
Installing collected packages: deprecated, redis, ray
Successfully installed deprecated-1.2.13 ray-1.9.2 redis-4.1.0
Collecting tensorboardX>=1.9
  Downloading tensorboardX-2.4.1-py2.py3-none-any.whl (124 kB)
[K     |████████████████████████████████| 124 kB 4.3 MB/s 
Installing collected packages: tensorboardX
Successfully installed tensorboardX-2.4.1


安装完必备的依赖，小A的故事正式开始。

In [None]:
import argparse
import os
from functools import partial
from math import ceil
import torch
import sys
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import ray
from ray import tune
import shutil
import json
from ray.tune import CLIReporter
from ray.tune.schedulers import ASHAScheduler

from torch.utils.data import random_split, Subset
from torchvision.datasets import MNIST
from torchvision.transforms import transforms
import logging

小A是一所公司新入职的算法工程师，刚入职不久就接到了产品经理的一个需求，维护一个部署在Ray Serve上的推理模型，监控它的性能变化。
这里我们使用基于mnist数据集训练的CNN模型为例： 

In [None]:
class ConvNet(nn.Module):
    def __init__(self, layer_size=192):
        super(ConvNet, self).__init__()
        self.layer_size = layer_size
        self.conv1 = nn.Conv2d(1, 3, kernel_size=3)
        self.fc = nn.Linear(192, self.layer_size)
        self.out = nn.Linear(self.layer_size, 10)

    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 3))
        x = x.view(-1, 192)
        x = self.fc(x)
        x = self.out(x)
        return F.log_softmax(x, dim=1)


def train(model, optimizer, train_loader, device=None):
    device = device or torch.device("cpu")
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()


def test(model, data_loader, device=None):
    device = device or torch.device("cpu")
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for batch_idx, (data, target) in enumerate(data_loader):
            data, target = data.to(device), target.to(device)
            outputs = model(data)
            _, predicted = torch.max(outputs.data, 1)
            total += target.size(0)
            correct += (predicted == target).sum().item()

    return correct / total

Data interface:To get data from MINST dataset

这是一个用于模拟每天到来数据的MNIST interface， 函数简介如下：


*   def _get_day_slice(self, day=0):


> 用于将数据集切分，初始拥有30%数据，剩余数据均分给每一天，模拟每日到来的新数据。


*   def get_data(self, day=0):

> 用于获得迄今为止的全部数据。


* def get_incremental_data(self, day=0):

> 用于获得某一天的新数据。并将它与初始数据，即记忆数据结合起来。例如day=3，此时就是第2天到第3天之间新获得的数据和第0天的初始数据。






In [None]:
class MNISTDataInterface(object):
    """Data interface. Simulates that new data arrives every day."""

    def __init__(self, data_dir, max_days=10):
        self.data_dir = data_dir
        self.max_days = max_days

        transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.1307,), (0.3081,))
        ])
        self.dataset = MNIST(
            self.data_dir, train=True, download=True, transform=transform)

    def _get_day_slice(self, day=0):
        if day < 0:
            return 0
        n = len(self.dataset)
        # Start with 30% of the data, get more data each day
        return min(n, ceil(n * (0.3 + 0.7 * day / self.max_days)))

    def get_data(self, day=0):
        """Get complete normalized train and validation data to date."""
        end = self._get_day_slice(day)

        available_data = Subset(self.dataset, list(range(end)))
        train_n = int(0.8 * end)  # 80% train data, 20% validation data

        return random_split(available_data, [train_n, end - train_n])

    def get_incremental_data(self, day=0):
        """Get next normalized train and validation data day slice and merge with rehearsal data."""
        start = self._get_day_slice(day - 1)
        end = self._get_day_slice(day)

        rehearsal_data = Subset(self.dataset, list(range(self._get_day_slice(0))))
        available_data = Subset(self.dataset, list(range(start, end)))
        train_n = int(
            0.8 * ((end - start)+len(rehearsal_data)))  # 80% train data, 20% validation data

        return random_split(available_data+rehearsal_data, [train_n, end - start+len(rehearsal_data) - train_n])

Trainable API for Ray Tune

这是一个用于训练的函数，基于Ray Tune，它会按照Ray Tune分发的config进行指定epoch次的训练，并将训练结果报告给Ray Tune，基于此来做出超参数搜索等操作。

In [None]:
def train_mnist(config,
                start_model=None,
                checkpoint_dir=None,
                num_epochs=10,
                use_gpus=False,
                data_fn=None,
                day=0):
    # Create model
    use_cuda = use_gpus and torch.cuda.is_available()
    device = torch.device("cuda" if use_cuda else "cpu")
    model = ConvNet(layer_size=config["layer_size"]).to(device)

    # Create optimizer
    optimizer = optim.SGD(
        model.parameters(), lr=config["lr"], momentum=config["momentum"])

    # Load checkpoint, or load start model if no checkpoint has been
    # passed and a start model is specified
    load_dir = None
    if checkpoint_dir:
        load_dir = checkpoint_dir
    elif start_model:
        load_dir = start_model

    if load_dir:
        model_state, optimizer_state = torch.load(
            os.path.join(load_dir, "checkpoint"))
        model.load_state_dict(model_state)
        optimizer.load_state_dict(optimizer_state)

    # Get full training datasets
    train_dataset, validation_dataset = data_fn(day=day)

    train_loader = torch.utils.data.DataLoader(
        train_dataset, batch_size=config["batch_size"], shuffle=True)

    validation_loader = torch.utils.data.DataLoader(
        validation_dataset, batch_size=config["batch_size"], shuffle=True)
    for i in range(num_epochs):
        train(model, optimizer, train_loader, device)
        acc = test(model, validation_loader, device)
        if i == num_epochs - 1:
            with tune.checkpoint_dir(step=i) as checkpoint_dir:
                torch.save((model.state_dict(), optimizer.state_dict()),
                           os.path.join(checkpoint_dir, "checkpoint"))
            tune.report(mean_accuracy=acc, done=True)
        else:
            tune.report(mean_accuracy=acc)


Tune from scratch

从头开始训练，使用当前获得的全部数据，对模型进行重新训练。通过Ray Tune依据config进行超参数搜索，记录下性能最好的一组超参数和模型保存下来。

In [None]:
def tune_from_scratch(num_samples=10, num_epochs=10, gpus_per_trial=0., day=0):
    data_interface = MNISTDataInterface("~/data", max_days=10)
    num_examples = data_interface._get_day_slice(day)
    #进行超参数搜索的参数空间
    config = {
        "batch_size": tune.choice([16, 32, 64]),
        "layer_size": tune.choice([32, 64, 128, 192]),
        "lr": tune.loguniform(1e-4, 1e-1),
        "momentum": tune.uniform(0.1, 0.9),
    }
    #基于metric做schedule
    scheduler = ASHAScheduler(
        metric="mean_accuracy",
        mode="max",
        max_t=num_epochs,
        grace_period=1,
        reduction_factor=2)

    reporter = CLIReporter(
        parameter_columns=["layer_size", "lr", "momentum", "batch_size"],
        metric_columns=["mean_accuracy", "training_iteration"])

    analysis = tune.run(
        partial(
            train_mnist,# 封装好的训练函数
            start_model=None,
            data_fn=data_interface.get_data,
            num_epochs=num_epochs,
            use_gpus=True if gpus_per_trial > 0 else False,
            day=day),
        resources_per_trial={
            "cpu": 1,
            "gpu": gpus_per_trial
        },
        config=config,
        num_samples=num_samples,
        scheduler=scheduler,
        progress_reporter=reporter,
        verbose=0,
        name="tune_serve_mnist_fromscratch")

    best_trial = analysis.get_best_trial("mean_accuracy", "max", "last")
    best_accuracy = best_trial.metric_analysis["mean_accuracy"]["last"]
    best_trial_config = best_trial.config
    best_checkpoint = best_trial.checkpoint.value

    return best_accuracy, best_trial_config, best_checkpoint, num_examples

（Optional）将checkpoint移动到头节点上的“model_dir”。 当您想在多个节点上运行 Serve，您可能需要checkpoint移动到共享存储，例如 Amazon S3。


In [None]:
def _move_checkpoint_to_model_dir(model_dir, checkpoint, config, metrics):
    """Move backend checkpoint to a central `model_dir` on the head node.
    If you would like to run Serve on multiple nodes, you might want to
    move the checkpoint to a shared storage, like Amazon S3, instead."""
    os.makedirs(model_dir, 0o755, exist_ok=True)
    print("Serving checkpoint: {}".format(checkpoint))
    checkpoint_path = os.path.join(model_dir, "checkpoint")
    meta_path = os.path.join(model_dir, "meta.json")

    if os.path.exists(checkpoint_path):
        shutil.rmtree(checkpoint_path)

    shutil.copytree(checkpoint, checkpoint_path)

    with open(meta_path, "wt") as fp:
        json.dump(dict(config=config, metrics=metrics), fp)

    return checkpoint_path

这里通过tune_from_scratch（）使用全量数据训练出模型并将其最佳参数保存起来。

In [None]:
if __name__ == "__main__":

    day = 7 # start training job from day 7
    ray.shutdown()
    ray.init(namespace="tune-serve-integration") #初始化ray集群

    model_dir = os.path.expanduser("~/mnist_tune_serve")

    gpus_per_trial = 0.25
    serve_gpu = True if gpus_per_trial > 0 else False
    num_samples = 1
    num_epochs = 1 

    # for early stopping
    # train everyday from scratch
    print("Start training job from scratch on day {}.".format(day))
    acc, config, best_checkpoint, num_examples = tune_from_scratch(
        num_samples, num_epochs, gpus_per_trial, day=day)
    print("Trained day {} from scratch on {} samples. "
          "Best accuracy: {:.4f}. Best config: {}".format(
        day, num_examples, acc, config))
    _move_checkpoint_to_model_dir(model_dir,
                                  best_checkpoint,
                                  config,
                                  acc)

2022-01-12 13:03:14,449	INFO registry.py:70 -- Detected unknown callable for trainable. Converting to class.


Start training job from scratch on day 7.




Trained day 7 from scratch on 47400 samples. Best accuracy: 0.9354. Best config: {'batch_size': 32, 'layer_size': 64, 'lr': 0.027941258257921566, 'momentum': 0.46977404378785503}
Serving checkpoint: /root/ray_results/tune_serve_mnist_fromscratch/DEFAULT_0119b_00000_0_batch_size=32,layer_size=64,lr=0.027941,momentum=0.46977_2022-01-12_13-03-16/checkpoint_000000/


现在我们已经拥有了需要进行微调的模型，以及它配套的训练，测试函数。只需要每七天进行一次重新训练就可以完成需求了。

And after you tune from scratch, you can use above code to implement incremental training. Fisrt, we need a tune.run() used old config and model.


在你从头开始全量训练之后，你可以使用以下代码来实现基于rehearsal的持续学习。tune的部分没什么两样。

In [None]:
def tune_from_existing(start_model,
                       start_config,
                       num_samples=10,
                       num_epochs=10,
                       gpus_per_trial=0.,
                       day=0):
    data_interface = MNISTDataInterface("/tmp/mnist_data", max_days=10)
    num_examples = data_interface._get_day_slice(day) - \
                   data_interface._get_day_slice(day - 1)+data_interface._get_day_slice(0)

    config = start_config.copy()
    config.update({
        "batch_size": tune.choice([16, 32, 64]),
        "lr": tune.loguniform(1e-4, 1e-1),
        "momentum": tune.uniform(0.1, 0.9),
    })

    scheduler = ASHAScheduler(
        metric="mean_accuracy",
        mode="max",
        max_t=num_epochs,
        grace_period=1,
        reduction_factor=2)

    reporter = CLIReporter(
        parameter_columns=["lr", "momentum", "batch_size"],
        metric_columns=["mean_accuracy", "training_iteration"])

    analysis = tune.run(
        partial(
            train_mnist,
            start_model=start_model,
            data_fn=data_interface.get_incremental_data,
            num_epochs=num_epochs,
            use_gpus=True if gpus_per_trial > 0 else False,
            day=day),
        resources_per_trial={
            "cpu": 1,
            "gpu": gpus_per_trial
        },
        config=config,
        num_samples=num_samples,
        scheduler=scheduler,
        progress_reporter=reporter,
        verbose=0,
        name="tune_serve_mnist_fromsexisting")

    best_trial = analysis.get_best_trial("mean_accuracy", "max", "last")
    best_accuracy = best_trial.metric_analysis["mean_accuracy"]["last"]
    best_trial_config = best_trial.config
    best_checkpoint = best_trial.checkpoint.value

    return best_accuracy, best_trial_config, best_checkpoint, num_examples


You can change the num of day, and train from existing

这里我们需要一个 def get_current_model(model_dir)来使用旧的配置和模型，不用重新开始训练了。接着就可以进行持续学习了并将它更新到你的Ray Serve上了。

In [None]:
if __name__ == "__main__":


    day = 4
    ray.shutdown()
    ray.init(namespace="tune-serve-integration")

    model_dir = os.path.expanduser("~/mnist_tune_serve")

    gpus_per_trial = 0.25
    serve_gpu = True if gpus_per_trial > 0 else False
    num_samples = 1
    num_epochs = 1


    def get_current_model(model_dir):
        checkpoint_path = os.path.join(model_dir, "checkpoint")
        meta_path = os.path.join(model_dir, "meta.json")

        if not os.path.exists(checkpoint_path) or \
                not os.path.exists(meta_path):
            return None, None, None

        with open(meta_path, "rt") as fp:
            meta = json.load(fp)

        return checkpoint_path, meta["config"], meta["metrics"]

    # train from existing
    old_checkpoint, old_config, old_acc = get_current_model(model_dir)
    if not old_checkpoint or not old_config or not old_acc:
            print("No existing model found. Train one with --from_scratch "
                  "first.")
            sys.exit(1)
    acc, config, best_checkpoint, num_examples = tune_from_existing(
            old_checkpoint,
            old_config,
            num_samples,
            num_epochs,
            gpus_per_trial,
            day=day)
    print("Trained day {} from existing on {} samples. "
              "Best accuracy: {:.4f}. Best config: {}".format(
            day, num_examples, acc, config))

2022-01-12 13:03:49,437	INFO registry.py:70 -- Detected unknown callable for trainable. Converting to class.


Trained day 4 from existing on 22200 samples. Best accuracy: 0.9412. Best config: {'batch_size': 16, 'layer_size': 64, 'lr': 0.01753766087795641, 'momentum': 0.1121616438238826}
