In [None]:
# Copyright 2021 DeepMind Technologies Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# XManager Codelab Notebook


This notebook will take you through running an XManager experiment on Google Cloud Platform (GCP).



## Install XManager

Remember to restart the runtime after running the cell below for the first time. Avoid using run all, as this colab has async functions that will not work correctly.

In [None]:
!pip install git+https://github.com/deepmind/xmanager
!git clone https://github.com/deepmind/xmanager xmanager_repo

## Some utilities

In [None]:
# Dependencies used
from IPython.display import display
import ipywidgets
import os
import itertools
import asyncio
import nest_asyncio

# Display text box for updating environmental variables
def display_env_variable_box(env_variable):
  def variable_changed(change):
      os.environ[env_variable] = change.new

  ENV_VARIABLE_WIDGET = ipywidgets.Text(
      description=env_variable + ':',
      style={'description_width': 'initial'},
      layout=ipywidgets.Layout(width='50%'),
  )
  ENV_VARIABLE_WIDGET.observe(variable_changed, names='value')

  display(ENV_VARIABLE_WIDGET)

# Allows running async functions in Colab
nest_asyncio.apply()

run_async_function = lambda fn: asyncio.get_event_loop().run_until_complete(fn())

## Setup GCP

### 1. [Create](https://console.cloud.google.com/) a GCP project if one does not already exist and enter its name below

In [None]:
display_env_variable_box('GOOGLE_CLOUD_PROJECT')

In [None]:
!gcloud config set project "${GOOGLE_CLOUD_PROJECT}"

### 2. Authenticate

In [None]:
!gcloud auth login

In [None]:
!gcloud auth application-default login

### 3. Create a Google Cloud Storage Bucket if one does not already exist and enter its name in the box below

In [None]:
display_env_variable_box('GOOGLE_CLOUD_BUCKET_NAME')

## Launching an experiment

An experiment can be broken down into 5 steps:

1. Creating the experiment.
2. Defining the executable specification.
3. Defining the execution environment.
4. Creating the jobs and experiment units.
5. Defining the hyperparameters.

### 1. Creating the experiment

In [None]:
from xmanager import xm
from xmanager import xm_local
# This code block sets FLAGS to use default values to avoid an absl.flags.UnparsedFlagAccessError.
# Normally XManager flags are set via the command-line with `xmanager train.py -- --key=value`

from absl import flags
flags.FLAGS([''])
flags.FLAGS.xm_wrap_late_bindings = True

Experiments are the core of XManager. An experiment typically involves running a computation (e.g., training a model in JAX or TensorFlow) in different hyperparameter configurations. It can have associated metadata (name, description, notes, tags, links, etc.). Experiments are made up of various experiment units, including work units which do the computation(s) in question and auxiliary units which perform other functions like TensorBoard.

Give the experiment a name. The `create_experiment` method will also create a unique integer id for the experiment and save this experiment to a database.

In [None]:
async def create_experiment_demo():
  async with xm_local.create_experiment(experiment_title='my-experiment') as experiment:
    print(f'Local Experiment created with experiment_id={experiment.experiment_id}')

run_async_function(create_experiment_demo)

### 2. Defining the executable specification

Define the job that will run in the experiment. A `PythonContainer` is an example of an executable specification. This executable specification tells XManager to package everything inside the `PythonContainer.path` as a container and use `PythonContainer.entrypoint` as the main module. Because we cloned XManager to `~/xmanager_repo` in an early step, we can use one of the examples, `/content/xmanager_repo/examples/cifar10_torch` as the path.

We also need to declare where the executable should be staged. This step will upload the executable specification to the correct storage option that is best suited for the execution environment. For example, if the execution environment is Vertex AI, the executable must be stored in Google Container Registry. The `Vertex.Spec()` specification will upload the specification to Google Container Registry, where it will be accessible by Vertex AI.

```python
[executable] = experiment.package([
  xm.python_container(
    executor_spec=xm_local.Vertex.Spec(),
    path=os.path.expanduser('/content/xmanager_repo/examples/cifar10_torch'),
    entrypoint=xm.ModuleName('cifar10'),
  )
])
```

### 3. Defining the execution environment

Declare where the job will run and what compute requirements are necessary to run one job. To run on AI Vertex, we must use the `xm_local.Vertex` executor. Each job should use 1 NVidia T4 GPU, so we must pass in a `xm.JobRequirements` to the executor.

```python
executor = xm_local.Vertex(xm.JobRequirements(T4=1))
```

### 4. Launching the jobs

Finally, we can create an experiment and jobs to the experiment. A **job** is an unit of execution. A job contains an executable representing "what to run" and an executor reprenting "how to run it". Jobs can be reused.

To add a single job to the experiment, create a `xm.Job` object that combine the executable, compute requirements, and custom arguments hyperparameters, and the job to the experiment.

```python
experiment.add(xm.Job(
    executable=executable,
    executor=executor,
    args={'batch_size': 64, 'learning_rate': 0.01},
))
```

#### Job Groups


A **job group** is a collection of jobs. Job groups can be added to experiments in the same way. The jobs in a job groups should be connected in some way. Some examples include: a client and server, multiple workers in a distributed experiment, a TPU session and a coordinator, a trainer and a evaluator, etc.

```python
async with xm_local.create_experiment(experiment_title='cifar10') as experiment:
    await experiment.add(xm.JobGroup(
        client=xm.Job(
            executable=executable,
            executor=executor,
        ),
        server=xm.Job(
            executable=executable,
            executor=executor,
        ),
    ))
```

### 5. Defining the hyperparameters

In research, it is often required to run the experimental setup multiple times with different hyperparameter values. This is called **hyperparameter optimization**. The simplest form of hyperparameter optimization is called *grid search* or *parameter sweep*, which is an exhaustive search through all possible Cartesian products of hyperparameter values. Grid search trials can be constructed using `itertools`.

In [None]:
inputs = {
    'batch_size': [64, 128],
    'learning_rate': [0.01, 0.001],
}
hyperparameters = list(dict(zip(inputs, x)) for x in itertools.product(*inputs.values()))

from pprint import pprint
pprint(hyperparameters)

To perform the grid search, loop over all the hyperparameters, passing a different hyperparameter configuration to the `args` parameter of each job. Add each job to the experiment.

```python
async with xm_local.create_experiment(experiment_title='cifar10') as experiment:
    for hparams in trials:
        experiment.add(xm.Job(
            executable=executable,
            executor=executor,
            args=hparams,
        ))
```

### Experiment Units


The return result of `experiment.add` is an awaitable experiment unit.

An **experiment unit** is created by XManager for every instance the experiment launches a set launched jobs. Jobs inside an experiment unit must be created together or should be destroyed together.

The common type of experiment unit is the **work unit**. A work unit contains the main task of training in an ML experiment, and they are part of the hyperparameter sweep. A helpful way of thinking of work units is that each work unit represents a trial. Just as an experiment can trial different hyperparameter sets, an XManager experiment can create a work unit for each trial. Work units are enumerated and assigned a `work_unit_id` starting from 0.

There are other experiment unit types which represent differnt roles. For example, an **auxiliary unit** represents a job that runs alongside all the trials such as a Tensorboard or an hyperparameter optimizer job.

### Job Generator

It's possible to modify the args of a job based on the attributes of the work unit. A good example of wanting to do this is when you want to create a pass a different log directory argument to each work unit. Another example is if you need to adjust the args with the correct DNS names based on the `work_unit_id`.

A job generator allows you modify the arguments of a job before they are added to the work unit.

```python
async generate(work_unit, hparams):
    print(work_unit.work_unit_id, hparams)
    # hparams.update({'log_dir', f'/tmp/{work_unit.work_unit_id}'})
    work_unit.add(xm.Job(
        executable=executable,
        executor=executor,
        args=hparams,
    ))

async with xm_local.create_experiment(experiment_title='cifar10') as experiment:
    for hparams in trials:
        work_unit = await experiment.add(generate, hparams)
```

### Tracking job status


You can list all of your previous experiments.

In [None]:
[e.experiment_id for e in xm_local.list_experiments()]

Some execution environments allow you to track the status of jobs in an experiment. Vertex AI is one of the execution environments that supports job-tracking.

```python
# TODO: Use experiment.work_units instead of private member.
for i, unit in enumerate(experiment._experiment_units):
    print(f'[{i}] Completed: {unit.get_status().is_completed}, Failed: {unit.get_status().is_failed}')
```

# End to end

Combining everything above into a single code-block, the launch script looks like this:

In [None]:
async def launch_experiment():
  async with xm_local.create_experiment(experiment_title='cifar10') as experiment:
    [executable] = experiment.package([
        xm.python_container(
            executor_spec=xm_local.Vertex.Spec(),
            path=os.path.expanduser('/content/xmanager_repo/examples/cifar10_torch'),
            entrypoint=xm.ModuleName('cifar10'),
        )
    ])

    batch_sizes = [64, 128]
    learning_rates = [0.01, 0.001]
    trials = list(
        dict([('batch_size', bs), ('learning_rate', lr)])
        for (bs, lr) in itertools.product(batch_sizes, learning_rates)
    )
    for hyperparameters in trials:
        experiment.add(xm.Job(
            executable=executable,
            executor=xm_local.Vertex(requirements=xm.JobRequirements(T4=1)),
            args=hyperparameters,
        ))

In [None]:
# Since Docker can't be used in Colab, the image will be built using the CloudBuild API.

# Make sure required APIs are enabled for the project
run_async_function(launch_experiment)

## Revoke credentials

In [None]:
!gcloud auth revoke
!gcloud auth application-default revoke
!gcloud config unset project