In [None]:
import vineyard

vineyard_client = vineyard.connect()

In [None]:
from dask_kubernetes import KubeCluster
from dask.distributed import Client, progress

dask_cluster = KubeCluster.from_yaml('dask-worker-spec.yml')
dask_cluster.scale(10)  # specify number of workers explicitly

dask_client = Client(dask_cluster)

filenames = hdfs.glob('/data/*/*')
futures = dask_client.map(text_to_vec, filenames)
partitioned_vineyard_tensors = client.gather(iter(futures))

vineyard_global_tensor = create_global_tensor(partitioned_vineyard_tensors)

In [None]:

import ray
import torch.optim as optim
import torch
import torch.nn as nn
import torch.nn.functional as F

from torchvision import datasets, transforms
from vineyard.contrib.ray import init_ray_cluster


class Model(nn.Module):
    def __init__(self):
        super(Model, self).__init__()
        self.conv1 = nn.Conv2d(1, 20, 5, 1)
        self.conv2 = nn.Conv2d(20, 50, 5, 1)
        self.fc1 = nn.Linear(4 * 4 * 50, 500)
        self.fc2 = nn.Linear(500, 10)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2, 2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2, 2)
        x = x.view(-1, 4 * 4 * 50)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return F.log_softmax(x, dim=1)

def train(model, device, train_loader, optimizer):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        
def test(model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)

            # sum up batch loss
            test_loss += F.nll_loss(
                output, target, reduction="sum").item()
            pred = output.argmax(
                dim=1,
                keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)
    return {
        "loss": test_loss,
        "accuracy": 100. * correct / len(test_loader.dataset)
    }
        
class Network(object):
    def __init__(self, lr=0.01, momentum=0.5):
        use_cuda = torch.cuda.is_available()
        self.device = device = torch.device("cuda" if use_cuda else "cpu")
        # create data loader from vineyard object
        self.train_loader, self.test_loader = vineyard.contrib.torch.dataloader(vineyard_global_tensor)

        self.model = Model().to(device)
        self.optimizer = optim.SGD(
            self.model.parameters(), lr=lr, momentum=momentum)

    def train(self):
        train(self.model, self.device, self.train_loader, self.optimizer)
        return test(self.model, self.device, self.test_loader)

    def get_weights(self):
        return self.model.state_dict()

    def set_weights(self, weights):
        self.model.load_state_dict(weights)

    def save(self):
        torch.save(self.model.state_dict(), "mnist_cnn.pt")

In [None]:
# distributed training and testing using ray

cluster_scale = 20

# this is where magic happens
ray_address = init_ray_cluster(cluster_scale, use_data_locality=vineyard_global_tensor, allow_migration=True, allow_repartition=True)

ray.init(address=ray_address)

RemoteNetwork = ray.remote(Network)

NetworkActors = [RemoteNetwork.remote() for _ in range(cluster_scale)]

ray.get([actor.train.remote() for actor in NetworkActors])

weights = ray.get([actor.get_weights.remote() for actor in NetworkActors])

from collections import OrderedDict
averaged_weights = OrderedDict(
    [(k, sum[weights[i][k] for i in range(cluster_scale)] / cluster_scale) for k in weights[0]])

weight_id = ray.put(averaged_weights)
[
    actor.set_weights.remote(weight_id)
    for actor in NetworkActors
]
ray.get([actor.test.remote() for actor in NetworkActors)

In [None]:
def init_ray_cluster(cluster_scale, use_data_locality=None, allow_migration=False, allow_repartition=False):
    if use_data_locality is not None:
        located_nodes = use_data_locality.get_locations()
    if cluster_scale > len(located_nodes) and allow_repartition:
        located_nodes = use_data_locality.repartition(cluster_scale)
    # modify ray-namespace.yaml and ray-cluster.yaml to set the affinity with located_nodes
    # and launch the ray k8s cluster based on https://docs.ray.io/en/master/cluster/kubernetes.html
    ray_address, ray_pod_locations = launcher_ray_k8s(located_nodes)
    if ray_pod_locations - located_nodes:
        if allow_migration:
            use_data_locality.migration(located_nodes - ray_pod_locations, ray_pod_locations - located_nodes)
        else:
            raise ValueError("Cannot align ray with vineyard, set allow_migration to True to solve the problem.")
    return ray_address
    