# Stable Diffusion |  MLFlow Driven Parallelism within ADSP

## Overview

This solution leverages [https://keras.io/api/keras_cv/models/stable_diffusion/](https://keras.io/api/keras_cv/models/stable_diffusion/) to generate imagery from textual prompts.


## Workflow

Image processing occurs in batches.  If executing locally the batches are processed in serial, when running within ADSP these are processed in parallel.

Parallel execution occurs within ADSP `run-once` jobs associcated with the project.  The scheduler will block until all jobs have completed.

### Workflow Diagram
![Workflow Overview](../assets/workflow-overview.png)

## Setup
1. Download the example.
2. Ensure the variable `AE_MLFLOW_EXPERIMENT_NAME` within the `anaconda-project.yml` is updated appropriately.
3. Upload the project to ADSP.  If you're already here, great!  Just skip this and keep going. :)
    > ae5 project upload .
4. Ensure you have the below AE5 secrets defined, or uncommented and added to the `anaconda-project.yml` file.
    
    | Variable              |
    |-----------------------|
    | AE5_HOSTNAME          |
    | AE5_USERNAME          |
    | AE5_PASSWORD          |
    | ADSP_WORKER_MAX       |
    | MLFLOW_TRACKING_URI   |
    | MLFLOW_REGISTRY_URI   |
    | MLFLOW_TRACKING_TOKEN |

5. Start a project session and allow conda to complete dependency installation. 
   1. Perform the one time ADSP account setup for Keras (see below) if this has not yet been completed.


### Keras Setup for parallel processing within ADSP

Since this makes changes to your user account environment they must be perfomed from an interactive session.  This change needs to be done only once per account.  The change affects all projects the user launches.

#### Account Level One Time Setup

Within `/opt/continuum` create a symbolic link `.keras` to --> `user/home/.keras`

> cd /opt/continuum && ln -s user/home/.keras .keras

This allows keras to download and cache models, checkpoints, datasets, etc between all instances.
If this step is not completed each time Keras executes in a new session, or job it will re-download these external resources.

# Workflow

### Start the workflow

Ensure:
 1. We load AE5 secrets
 2. That we have set our experiment name for reporting.
     1. See notes in anaconda-project.xml around MLFlow project naming control.

In [None]:
import mlflow

from anaconda.enterprise.server.common.sdk import load_ae5_user_secrets
from mlflow_adsp import upsert_experiment

load_ae5_user_secrets()
mlflow.set_experiment(experiment_id=upsert_experiment())

### Define Our Parameters

In [None]:
# The prompt to use for image generation.
prompt: str = "flower"

In [None]:
from typing import Optional

# The base data directory that requests are stored in.
data_base_dir: str = "data"

# Number of total images to generate.
total_batch_size: int = 5

# Number of images to generate per worker invocation.
per_worker_batch_size: int = 1

# Number of Steps
num_steps: int = 50

# Optional Seed
seed: Optional[float] = None

# Image Width
image_width: int = 192

# Image Height
image_height: int = 192

# The name of the run.
run_name: str = "workflow-step-process-data"

# The backend to use for workers.
backend: str = "adsp"  # adsp | local

### Start A New MLFlow Run

In [None]:
from mlflow import ActiveRun
from mlflow_adsp import create_unique_name

workflow_run: ActiveRun = mlflow.start_run(run_name=create_unique_name(name=run_name))

# The MLFlow Run ID
run_id: str = workflow_run.info.run_id

### Prepare The Processing Job Request

In [None]:
import uuid
from pathlib import Path

# Generate our internal tracking request ID
request_id: str = str(uuid.uuid4())

# Store our prompt to the shared data cache for all the workers to load.
base_path: Path = Path(data_base_dir) / request_id
base_path.mkdir(parents=True, exist_ok=True)
with open(file=(base_path / "prompt.txt").as_posix(), mode="w", encoding="utf-8") as file:
    file.write(prompt)

### Workflow Step 1 - Prepare Worker Environment

Below we launch step 1 `prepare_worker_environment` locally and allow it to build and prepare the worker environment.

The this allows the jobs to load the runtime environment quickly when starting and avoids having to perform rebuild a conda envirnment prior to execution.  It will not recreate the environment on subsequent runs.

In [None]:
from mlflow_adsp import Step

step = Step(
    entry_point="prepare_worker_environment",
    parameters={"backend": backend},
    run_name=create_unique_name(name="workflow-step-prepare-worker-environment"),
    synchronous=True,
    backend="local",
)
mlflow.projects.run(**step.dict(by_alias=False))

# Alternatively a direct mlflow run invocation would be:
# mlflow.projects.run({
#     "entry_point": "prepare_worker_environment",
#     "parameters": {"backend": backend},
#     "run_name": create_unique_name(name="workflow-step-prepare-worker-environment"),
#     "uri": ".",
#     "env_manager": "local"
# })

### Workflow Step 2 - [Batch Processing]

Create the batch request

In [None]:
from typing import List
import math

worker_count: int = math.ceil(total_batch_size / per_worker_batch_size)
print(f"Number of jobs needed to complete request: {worker_count}")

steps: List[Step] = []
for _ in range(worker_count):
    step: Step = Step(
        entry_point="process_data",
        parameters={
            "request_id": request_id,
            "data_base_dir": data_base_dir,
            "batch_size": per_worker_batch_size,
            "image_width": image_width,
            "image_height": image_height,
            "num_steps": num_steps,
            "seed": seed,
        },
        run_name=create_unique_name(name="workflow-step-process-data"),
        backend=backend,
        backend_config={"resource_profile": "large"},
        synchronous=True if backend == "local" else False,  # Force to serial processing if running locally.
    )
    steps.append(step)

### Submit the batch and wait for completion

The scheduler will block until execution of the batch is complete.

In [None]:
from mlflow_adsp import Job, Scheduler

# submit jobs
print("Launching processing steps ...")
adsp_jobs: List[Job] = Scheduler().process_work_queue(steps=steps)
print("Work complete.")

# End the run
mlflow.end_run()

### Display The Results

In [None]:
import matplotlib.pyplot as plt


def plot_images(images):
    plt.figure(figsize=(20, 20))
    for i in range(len(images)):
        ax = plt.subplot(1, len(images), i + 1)
        plt.imshow(images[i])
        plt.axis("off")

We will look up and download all the png files added to the runs in order to build our gallery.

In [None]:
from mlflow import MlflowClient
from PIL import Image

# Review job status
mlflow_client = MlflowClient()

images = []

for job in adsp_jobs:
    print(f"Job ID: {job.id}, Status: {job.last_status}, Number of executions: {len(job.runs)}")

    # If a job failed, then it was ran more than once.  If successful the last run is the one that successed and will be loaded.
    mlflow_run_id: str = job.runs[-1].mlflow_run_id

    # Get the list of artifacts for the run.
    artifacts = mlflow_client.list_artifacts(mlflow_run_id)

    # We have a few different types but we only want the images for the gallery.
    images_metadata = [
        artifact for artifact in mlflow_client.list_artifacts(mlflow_run_id) if artifact.path.endswith(".png")
    ]

    # Download the image and add it to our gallery
    for metadata in images_metadata:
        artifact_uri = f"runs:/{mlflow_run_id}/{metadata.path}"
        image: Image = mlflow.artifacts.load_image(artifact_uri)
        images.append(image)

Display the gallery

In [None]:
plot_images(images)