# Distributed FLoX using ProxyStore and Globus Compute on CHI@Edge

This notebook details how to set up Globus Compute/ProxyStore/CHI@Edge containers for distributed FLoX

## Globus Auth
To automate the process of launching CHI@Edge containers without manually authenticating with Globus auth, we rely on a Docker image set up to use
Globus Auth ID/Secret tokens. In order to obtain these tokens, from the [Globus web app](https://app.globus.org) you can access Settings > Developers > Advanced Registration and fill out the form. Once completed, the tokens will be generated and provided to you. For this notebook to work, it is recommended to set environment variables `GLOBUS_CLIENT_ID` and `GLOBUS_CLIENT_SECRET` with their corresponding values.

## CHI@Edge auth
CHI@Edge provide instruction ([found here](https://chameleoncloud.readthedocs.io/en/latest/technical/cli.html#cli-authentication)) for how to authenticate with CHI and use the CLI and APIs locally. All these instructions must be followed and the OpenStack RC script must be sourced prior to executing this notebook.

## Notebook dependencies
This notebook makes use of both FLoX and Chi@Edge dependencies. It is necessary to pip install the `distributed` branch of FLoX and install python-chi-edge. Futhermore `proxystore[endpoints]` and `globus-compute-endpoint` need to also be pip installed and configured prior to notebook execution. The constant variables `LEADER_PS_UUID` and `LEADER_GC_UUID` in the cell below must be update with these local endpoint values.

## Troubleshooting
Make sure python (major) versions are consistent between all devices. For instance, the docker image used by the CHI@Edge devices use Python3.10. In addition, urllib3 installed in the Docker image is 2.0.7, whereas locally it might be below 2.0.0 . This has been found to be a source of error.

In general CHI@Edge containers might be flaky to deploy. Rerun the cells where the container is created if the container creating failed without any real reason.


In [2]:
import os

# environment variables
GLOBUS_CLIENT_ID=os.environ.get('GLOBUS_CLIENT_ID')
GLOBUS_CLIENT_SECRET=os.environ.get('GLOBUS_CLIENT_SECRET')

CHI_PROJ_NAME='CHI-231082' # Change to your project name
LEASE_DURATION=2 # Change if you need more/less
N_DEVICES=2 # Change if you need more/less

DOCKER_IMAGE='valhayot/flox-images:latest'

CONTAINER_ENV = {
    'FUNCX_SDK_CLIENT_ID': GLOBUS_CLIENT_ID,
    'FUNCX_SDK_CLIENT_SECRET': GLOBUS_CLIENT_SECRET,
    'PROXYSTORE_GLOBUS_CLIENT_ID': GLOBUS_CLIENT_ID,
    'PROXYSTORE_GLOBUS_CLIENT_SECRET': GLOBUS_CLIENT_SECRET,
}

CONTAINER_PORTS=[5671, 8675]

USERNAME = os.environ.get('USER')
MACHINE_NAME = 'raspberrypi4-64'

RESERVATION: str | None = None # if a reservation id is provided, it will not try to create a new reservation

LEADER_PS_UUID = 'b72af0a0-1030-4720-9ce7-3b975f34107f' # update with local proxystore endpoint uuid
LEADER_GC_UUID = '470973b0-815d-4be2-a651-f0239e05dc2b' # update with local globus-compute endpoint uuid

# set environment vars for notebook
os.environ['FUNCX_SDK_CLIENT_ID'] = GLOBUS_CLIENT_ID
os.environ['FUNCX_SDK_CLIENT_SECRET'] = GLOBUS_CLIENT_SECRET
os.environ['PROXYSTORE_GLOBUS_CLIENT_ID'] = GLOBUS_CLIENT_ID
os.environ['PROXYSTORE_GLOBUS_CLIENT_SECRET'] = GLOBUS_CLIENT_SECRET


In [4]:
import chi
# Before we go any further, we need to select which Chameleon site we will be using.
chi.set('project_domain_name', 'chameleon')
chi.set('project_name', CHI_PROJ_NAME)
chi.use_site("CHI@Edge")

Now using CHI@Edge:
URL: https://chi.edge.chameleoncloud.org
Location: University of Chicago, Chicago, Illinois, USA
Support contact: help@chameleoncloud.org


In [5]:
# helper functions for creating reservations and containers

from chi import container
from chi import lease


def create_reservation(username, machine_name, lease_duration=1, n_devices=1):
    # get dates for lease start and end
    start, end = lease.lease_duration(days=lease_duration)

    # make a unique name for the lease
    lease_name = f"{username}-{machine_name}-{start}"

    reservations = []
    lease.add_device_reservation(reservations, count=n_devices, machine_name=machine_name)
    container_lease = lease.create_lease(lease_name, reservations)
    lease_id = container_lease["id"]
    reservation_id = lease.get_device_reservation(lease_id)

    print(f"created lease with name {lease_name} and uuid {lease_id} and reservation id {reservation_id}, waiting for it to start. This can take up to 60s.")
    lease.wait_for_active(lease_id)
    print("Done!")
    
    return reservation_id

def create_container(username, machine_name, reservation_id, docker_image, container_ports, container_env, name_suffix: str | None=None):
    print("Requesting container ...")

    # set a name for the container. Becaue CHI@Edge uses Kubernetes, ensure that underscores aren't in the name
    if name_suffix is not None:
        container_name = f"{username}-{machine_name}-{name_suffix}".replace("_","-")
    else:
        container_name = f"{username}-{machine_name}".replace("_","-")
        

    try:
        my_container = container.create_container(
            container_name, 
            image=docker_image,
            exposed_ports=container_ports,
            environment=container_env,
            reservation_id=reservation_id,
            platform_version=2,
        )
    except RuntimeError as ex:
        print(ex)
        print(f"please stop and/or delete {container_name} and try again")
    else:
        print(f"Successfully created container: {container_name}!")
    
    return my_container

In [6]:
# create reservation
if RESERVATION is None:
    RESERVATION = create_reservation(USERNAME, MACHINE_NAME, lease_duration=LEASE_DURATION, N_DEVICES=N_DEVICES)

In [8]:
# create first container
worker_1 = create_container(USERNAME, MACHINE_NAME, RESERVATION, DOCKER_IMAGE, CONTAINER_PORTS, CONTAINER_ENV, name_suffix='1')

Requesting container ...
Successfully created container: valeriehayot-sasson-raspberrypi4-64-1!


In [9]:
# create second container
worker_2 = create_container(USERNAME, MACHINE_NAME, RESERVATION, DOCKER_IMAGE, CONTAINER_PORTS, CONTAINER_ENV, name_suffix='2')

Requesting container ...
Successfully created container: valeriehayot-sasson-raspberrypi4-64-2!


In [10]:
def get_ps_uuid(worker_uuid, split_idx=-1):
    cmd = 'proxystore-endpoint list'
    return container.execute(worker_uuid, cmd)["output"].split()[split_idx]

def get_gc_uuid(worker_uuid, split_idx=-7):
    cmd = 'globus-compute-endpoint list'
    return container.execute(worker_uuid, cmd)["output"].split()[split_idx] 

In [11]:
w1_ps_endpoint = get_ps_uuid(worker_1.uuid)
print(f'Worker 1 ProxyStore endpoint UUID: {w1_ps_endpoint}')

Worker 1 ProxyStore endpoint UUID: 2dea5f0b-39f3-46bb-9d58-df67deb58108


In [12]:
w1_gc_endpoint = get_gc_uuid(worker_uuid=worker_1.uuid)
print(f'Worker 1 Globus Compute endpoint UUID: {w1_gc_endpoint}')

Worker 1 Globus Compute endpoint UUID: 675de750-cc84-4fa2-9976-ed9738adeba2


In [13]:
w2_ps_endpoint = get_ps_uuid(worker_uuid=worker_2.uuid)
print(f'Worker 2 ProxyStore endpoint UUID: {w2_ps_endpoint}')

Worker 2 ProxyStore endpoint UUID: 190691b5-cb9b-461a-99e9-6c75ffed9b82


In [14]:
w2_gc_endpoint = get_gc_uuid(worker_uuid=worker_2.uuid)
print(f'Worker 2 Globus Compute endpoint UUID: {w2_gc_endpoint}')

Worker 2 Globus Compute endpoint UUID: 6199ea17-607f-4196-8aa9-18fa64a31e13


In [15]:
# create flox topology
import yaml

topo = {
    0: {
        'kind': 'leader',
        'globus_compute_endpoint': LEADER_GC_UUID,
        'proxystore_endpoint': LEADER_PS_UUID,
        'children': [1,2]
    },
    1: {
        'kind': 'worker',
        'globus_compute_endpoint': w1_gc_endpoint,
        'proxystore_endpoint': w1_ps_endpoint,
        'children': []
    },
    2: {
        'kind': 'worker',
        'globus_compute_endpoint': w2_gc_endpoint,
        'proxystore_endpoint': w2_ps_endpoint,
        'children': []
    }
}

# save to file
with open('../examples/flocks/proxy.yaml', 'w+') as f:
    yaml.safe_dump(topo, f, default_flow_style=None)

## Running Distributed FL
**Note: start here if CHI@Edge containers and yaml are already configured and just want to execute FLoX

In [2]:
# run flox
import os

from pandas import DataFrame

from torchvision.datasets import FashionMNIST
from torchvision.transforms import ToTensor

from flox.flock import Flock
from flox.nn import FloxModule
from flox.utils.data import federated_split, fed_barplot
from flox.run import federated_fit
from flox.strategies.registry.fedavg import FedAvg

#import logging

#logging.basicConfig(level=logging.DEBUG)

os.environ["TORCH_DATASETS"] = os.getcwd()

flock = Flock.from_yaml("../examples/flocks/proxy.yaml")

fashion_mnist = FashionMNIST(root=os.getcwd(), download=True, train=False)
data = FashionMNIST(root=os.getcwd(), download=True, train=False, transform=ToTensor())
federated_data = federated_split(
    data,
    flock,
    10,
    samples_alpha=1,
    labels_alpha=1
)

class MyModule(FloxModule):
    def __init__(self, lr: float = 0.01):
        import torch.nn as nn

        super().__init__()
        self.lr = lr
        self.flatten = nn.Flatten()
        self.linear_stack = nn.Sequential(
            nn.Linear(28 * 28, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 10),
        )

    def forward(self, x):
        x = self.flatten(x)
        return self.linear_stack(x)

    def training_step(self, batch, batch_idx):
        import torch.nn as nn
        # This callback is added by ``FloxModule`` and based on the ``LightningModule``
        inputs, targets = batch
        preds = self(inputs)
        loss = nn.functional.cross_entropy(preds, targets)
        return loss

    def configure_optimizers(self):
        from torch.optim import SGD
        # This callback is added by ``FloxModule`` and based on the ``LightningModule``
        return SGD(self.parameters(), lr=self.lr)


results: DataFrame = federated_fit(
    flock,
    MyModule,
    federated_data,
    num_global_rounds=5,
    strategy="fedprox",
    where="globus_compute"
)
results.head()



FINISHED: <Future at 0x297624220 state=finished returned JobResult>




FINISHED: <Future at 0x297625ea0 state=finished returned JobResult>




FINISHED: <Future at 0x297624d30 state=finished returned JobResult>




FINISHED: <Future at 0x297657a00 state=finished returned JobResult>


federated_fit::sync: 100%|██████████| 5/5 [06:07<00:00, 73.42s/it]

FINISHED: <Future at 0x2a90f0490 state=finished returned JobResult>





Unnamed: 0,train/loss,train/epoch,train/batch_idx,train/time,node/idx,node/kind,parent/idx,parent/kind,round
0,2.307554,0,0,2023-11-02 03:55:29.936379,1,worker,0,leader,0
1,2.300235,0,1,2023-11-02 03:55:30.021001,1,worker,0,leader,0
2,2.300128,0,2,2023-11-02 03:55:30.125430,1,worker,0,leader,0
3,2.288718,0,3,2023-11-02 03:55:30.225741,1,worker,0,leader,0
4,2.290709,0,4,2023-11-02 03:55:30.340433,1,worker,0,leader,0
