# High-compute workflow orchestration with [*Covalent*](http://covalent.xyz).

Run complex workflows with ease; using cloud, on-prem, and/or local backends.

## Covalent is available from PYPI

```
pip install covalent
```

## Usage

👉 Add one or more [@ct.electron](https://docs.covalent.xyz/docs/user-documentation/concepts/covalent-basics#electron) decorators to designate workflow tasks (i.e. *electrons*).

👉 Specify [executors](https://docs.covalent.xyz/docs/plugin) to choose electron backends.


# 🔗 See links below for more information!

- 📚 [Covalent Documentation](https://docs.covalent.xyz)
- 🌟 [Covalent GitHub](https://github.com/AgnostiqHQ/covalent)
- ✍️ [Covalent Blogs](https://www.covalent.xyz/blog/)
    - [Simplifying generative AI workflows with Covalent and Streamlit](https://blog.streamlit.io/simplifying-generative-ai-workflows/)
    - [Improving Chest X-ray Pneumonia Detection with Federated Learning and Covalent](https://medium.com/@filip_98594/improving-chest-x-ray-pneumonia-detection-with-federated-learning-and-covalent-ff60eef7946c)



---

# Setup

## Environment

Run the following commands to create and activate the environment:
```shell
$ conda env create -f "environment.yml"
$ conda activate covalent_pydata_2023
```

## Infrastructure

See `terraform/tutorial-dask/README.md` for instructions on how to create the infrastructure.

## Start Covalent

Run the following command to start Covalent:

```shell
$ covalent start

Covalent server has started at http://localhost:48008
```

---

# TUTORIAL: Automatic Image Captioning

## Imports

In [1]:
import os
from dataclasses import dataclass
from datetime import datetime
from typing import List

import covalent as ct
import torch
from PIL import Image
from transformers import BlipForConditionalGeneration, BlipProcessor

DASK_SCHEDULER_ADDRESS = "52.87.169.192:8786"

## Constants

In [2]:
ROW_BLANK = "{datetime} | {image_name:<30} | {description}\n"

TRIGGER_DIR = os.path.abspath("./images-triggered")
CATALOG = os.path.join(TRIGGER_DIR, "catalog.txt")

if not os.path.exists(CATALOG):
    with open(CATALOG, "w") as f:
        f.write('')

## Utilities

In [3]:
def get_model(type_: object, id_: str, label=None, **params):
    """Download files for the model if necessary and return the model."""

    model_id = id_.replace('/', '_')
    if label:
        model_id += f"_{label}"

    model_path = os.path.abspath(f"./{model_id}")
    if os.path.exists(model_path):
        return type_.from_pretrained(model_path, **params)

    model = type_.from_pretrained(id_, **params)
    model.save_pretrained(model_path)
    return model

In [4]:
@dataclass
class InputImage:
    file: str
    description: str = ''
    image: Image = None

    def __post_init__(self):
        self.file = os.path.abspath(os.path.join(TRIGGER_DIR, self.file))
        self.image = self.image or Image.open(self.file)


def only_image(p: str) -> bool:
    ext = os.path.splitext(p)[1]
    return ext in {".jpg", ".jpeg", ".png"}


def new_image(input_image) -> bool:
    with open(CATALOG, "r") as f:
        name = os.path.basename(input_image.file)
        line = f.readline()

        while line:
            if name in line.split('|')[1].strip():
                return False
            line = f.readline()

        return True

# Now, let's define a workflow!

## Electrons

### 1. Read image files (local)

In [5]:
@ct.electron(executor="local")
def read_new_images() -> List[InputImage]:
    """Read new images from the trigger directory."""

    images = []
    for image_file in filter(only_image, os.listdir(TRIGGER_DIR)):
        images.append(InputImage(image_file))

    return list(filter(new_image, images))

### 2. Get image descriptions (Dask cluster on AWS)

In [6]:
aws_dask_exec = ct.executor.DaskExecutor(DASK_SCHEDULER_ADDRESS)

deps_pip = [
    "pillow==10.1.0",
    "torch==2.1.0",
    "torchvision==0.16.0",
    "transformers==4.33.1",
]

@ct.electron(executor=aws_dask_exec, deps_pip=deps_pip)
def get_descriptions(input_images: List[InputImage]) -> List[InputImage]:
    """Use BLIP to obtain a short description of an image."""

    processor = get_model(BlipProcessor, "Salesforce/blip-image-captioning-base", label="processor")
    model = get_model(BlipForConditionalGeneration, "Salesforce/blip-image-captioning-base", label="model")

    if torch.cuda.is_available():
        model = model.to("cuda")

    descriptions = []
    for input_image in input_images:
        inputs = processor(input_image.image, return_tensors="pt")

        if torch.cuda.is_available():
            inputs = inputs.to("cuda")

        out = model.generate(**inputs)
        input_image.description = processor.decode(out[0], skip_special_tokens=True)
        descriptions.append(input_image)

    return descriptions

### 3. Record descriptions (local)

In [7]:
@ct.electron(executor="local")
def write_descriptions(described_images: List[InputImage]) -> None:
    """Write descriptions to the catalog file."""

    for input_image in filter(new_image, described_images):
        image_name = os.path.basename(input_image.file)
        with open(CATALOG, "a", encoding="utf-8") as f:
            f.write(
                ROW_BLANK.format(
                    image_name=image_name,
                    datetime=datetime.now(),
                    description=input_image.description,
                )
            )

<a id='Lattice'></a>

## Lattice

#### Definition of the main workflow function

This is a [**triggered** workflow](https://docs.covalent.xyz/docs/features/triggers). Once we execute the cell below, it will run whenever two or more images are added to `TRIGGER_DIR`.

In [8]:
trigger = ct.triggers.DirTrigger(
    dir_path=TRIGGER_DIR,
    event_names=["created"],
    batch_size=2,
    recursive=False
)

@ct.lattice(triggers=[trigger])
def describe_images():
    images = read_new_images()
    described_images = get_descriptions(images)
    write_descriptions(described_images)

ct.dispatch(describe_images)()


'3fedd6f4-f4f0-411f-9246-c697dca17221'