# Zero-Data AI Model Foundry - Covalent Cloud

### Quick Links

- [Source Repo](https://github.com/AgnostiqHQ/tutorials_covalent_pycon_2024)
- [Covalent Cloud](https://www.covalent.xyz/cloud/)
- [Covalent Open-Source](https://github.com/AgnostiqHQ/covalent)
- [Covalent Cloud QuickStart](https://docs.covalent.xyz/docs/cloud/cloud_quickstart)

---

# Setting Up

In [None]:
import json
import os
import random
import shutil
from dataclasses import dataclass
from pathlib import Path
from uuid import uuid4

import covalent as ct
import covalent_cloud as cc
import torch
from covalent_cloud.cloud_executor.models.gpu import GPU_TYPE
from datasets import Dataset, load_from_disk
from peft import LoraConfig
from transformers import (AutoModelForCausalLM, AutoTokenizer,
                          BitsAndBytesConfig, TrainingArguments, pipeline)
from trl import SFTTrainer

## Authenticating with Covalent Cloud

In [None]:
CC_API_KEY = os.environ["CC_API_KEY"]  # set in `environment.yml` file
cc.save_api_key(CC_API_KEY)

## Create a [cloud volume](https://docs.covalent.xyz/docs/cloud/guides/cloud_storage) for persistent storage

In [None]:
volume = cc.volume("model-storage")  # store fine-tuned models and generated datasets

## Create [runtime environments](https://docs.covalent.xyz/docs/cloud/guides/cloud_custom_environments) for tasks and services

An environment for fine-tuning models:

In [None]:
FT_ENV = "model-fine-tuning"  # assign unique name for referring to this env

cc.create_env(
    name=FT_ENV,
    pip=[
        "accelerate==0.29.1",
        "bitsandbytes==0.43.0",
        "datasets==2.18.0",
        "pandas==2.2.1",
        "scipy==1.12.0",
        "sentencepiece==0.2.0",
        "torch==2.2.2",
        "transformers==4.39.3",
        "trl==0.8.1",
        "tqdm==4.66.2",
        "peft==0.10.0",
    ],
    wait=True,
)

Another environment for running the data generator LLM:

In [None]:
VLLM_ENV = "vllm"
cc.create_env(name=VLLM_ENV, pip=["vllm"], wait=True)

---

# Service: Data Generator LLM

This service hosts a powerful LLM that generates synthetic data for fine tuning another model.

<div align="center">
<img src="./assets/data-generator.png" alt="Highlight data generator component" height=700px/>
</div>

In [None]:
data_generator_ex = cc.CloudExecutor(
    env=VLLM_ENV,
    num_cpus=6,
    num_gpus=1,
    gpu_type=GPU_TYPE.A100,
    memory="48GB",
    time_limit="4 hours",
)

In [None]:
@cc.service(executor=data_generator_ex, name="LLM Data Generator", volume=volume, auth=False)
def llm_data_generator(model_name="unsloth/llama-3-8b-Instruct"):

    """Initialize the service that host the data generator LLM."""

    from vllm import LLM, SamplingParams  # NOTE: don't need this installed locally

    return {
        "llm": LLM(model=model_name, trust_remote_code=True, enforce_eager=True),
        "params": SamplingParams(temperature=0.7, top_p=0.8, max_tokens=1500),
    }


@llm_data_generator.endpoint("/generate-data")
def generate_data(
    llm, params, task, return_format, num_generations, target_items_per_response,
):
    """Generate data based on task, return format, etc."""

    prompt_template = (
        "<|begin_of_text|><|start_header_id|>system<|end_header_id|>"
        "You are a knowledgeable assistant who generates fine-tuning data for an LLM. "
        "Please generate {target_items_per_response} data items for the fine-tuning task specified by the user.\n"
        "IMPORTANT: Return a JSON array of new items in the format: \"{return_format}\""
        "<|eot_id|>"
        "<|start_header_id|>user<|end_header_id|>{user_prompt}<|eot_id|>"
        "<|start_header_id|>assistant<|end_header_id|>"
    )

    def _format_prompt(seed):
        return prompt_template.format(
            target_items_per_response=target_items_per_response,
            return_format=return_format,
            user_prompt=json.dumps({
                "task": task, "random_seed": seed,
                "constraint": "Respond with ONLY the generated data as a valid JSON array!",
            }),
        )

    random_seeds = random.sample(range(1000), num_generations)
    prompts_batch = list(map(_format_prompt, random_seeds))

    outputs = llm.generate(prompts_batch, params)
    texts = []
    for output in outputs:
        generated_text = output.outputs[0].text
        try:
            texts.extend(json.loads(generated_text))
        except Exception:
            continue

    return texts

---

# Workflow: Fine Tune & Deploy

This workflow runs model fine-tuning on a powerful GPU and deploys the model as a service.

<div align="center">
<img src="./assets/finetune-workflow.png" alt="Highlight fine-tune and deploy workflow" height=700px/>
</div>

## Training configuration params

This dataclass holds the myriad fine-tuning parameter defaults for the PEFT/LoRA approach.

In [None]:
@dataclass
class FineTuneArguments:
    # BitAndBytesConfig
    load_in_4bit: bool = True
    bnb_4bit_quant_type: str = "nf4"
    bnb_4bit_compute_dtype: str = "float16"
    bnb_4bit_use_double_quant: bool = False

    # TrainingArguments
    output_dir: str = "./outputs"
    learning_rate: float = 2e-3
    num_train_epochs: int = 5
    save_total_limit: int = 1
    save_strategy: str = "epoch"
    per_device_train_batch_size: int = 2
    gradient_accumulation_steps: int = 1
    optim: str = "paged_adamw_32bit"
    weight_decay: float = 0.001
    fp16: bool = False
    bf16: bool = False
    max_grad_norm: float = 0.3
    max_steps: int = -1
    warmup_ratio: float = 0.03
    group_by_length: bool = True
    lr_scheduler_type: str = "cosine"
    report_to: str = "none"

    # LoraConfig
    lora_alpha: int = 32
    lora_dropout: float = 0.05
    r: int = 32
    bias: str = "none"
    task_type: str = "CAUSAL_LM"

    # SFTTrainer
    dataset_text_field: str = "text"
    max_seq_length: int = 1024
    packing: bool = True
    dataset_batch_size: int = 10

    @property
    def training_args(self):
        return TrainingArguments(
            output_dir=self.output_dir,
            num_train_epochs=self.num_train_epochs,
            per_device_train_batch_size=self.per_device_train_batch_size,
            gradient_accumulation_steps=self.gradient_accumulation_steps,
            optim=self.optim,
            save_strategy=self.save_strategy,
            save_total_limit=self.save_total_limit,
            learning_rate=self.learning_rate,
            weight_decay=self.weight_decay,
            fp16=self.fp16,
            bf16=self.bf16,
            max_grad_norm=self.max_grad_norm,
            max_steps=self.max_steps,
            warmup_ratio=self.warmup_ratio,
            group_by_length=self.group_by_length,
            lr_scheduler_type=self.lr_scheduler_type,
            report_to=self.report_to,
        )

    @property
    def lora_config(self):
        return LoraConfig(
            lora_alpha=self.lora_alpha,
            lora_dropout=self.lora_dropout,
            r=self.r,
            bias=self.bias,
            task_type=self.task_type,
        )

    @property
    def trainer_params(self):
        return {
            "dataset_text_field": self.dataset_text_field,
            "max_seq_length": self.max_seq_length,
            "packing": self.packing,
            "dataset_batch_size": self.dataset_batch_size,
        }

## Electrons (i.e. workflow tasks)

### Data reader task for visibility

In [None]:
data_reader_ex = cc.CloudExecutor(env=FT_ENV, num_cpus=2, memory="16GB", time_limit="4 hours")

@ct.electron(executor=data_reader_ex)
def print_data(data_path, num_items=50):

    """Print dataset sample into stdout for inspection in Covalent UI."""

    dataset = load_from_disk(data_path)
    for i, item in enumerate(dataset[:num_items]):
        print(f"{i+1:>4}: {item['text']}\n")

### Fine-tuning task

In [None]:
fine_tune_ex = cc.CloudExecutor(env=FT_ENV, num_cpus=6, num_gpus=1, gpu_type=GPU_TYPE.A100, memory="32GB", time_limit="4 hours")

@ct.electron(executor=fine_tune_ex)
def fine_tune_model_peft(
    model_path, dataset_path, ft_args, model_type, tokenizer_type,
    device_map="auto", model_kwargs=None, model_config=None,
    tokenizer_config=None,
):

    """Run fine-tuning, save the model, and return the path to the saved model."""

    model_kwargs = model_kwargs or {"do_sample": True}
    model_config = model_config or {"use_cache": False, "pretraining_tp": 1}

    # Load dataset
    dataset_path_ = Path("/tmp") / Path(dataset_path).name
    shutil.copytree(dataset_path, dataset_path_)
    dataset_path = dataset_path_
    dataset = load_from_disk(dataset_path, keep_in_memory=True)

    # Quantization configuration
    quant_config = BitsAndBytesConfig(
        load_in_4bit=ft_args.load_in_4bit,
        bnb_4bit_quant_type=ft_args.bnb_4bit_quant_type,
        bnb_4bit_compute_dtype=getattr(torch, ft_args.bnb_4bit_compute_dtype),
        bnb_4bit_use_double_quant=ft_args.bnb_4bit_use_double_quant,
    )

    # Load and configure the downloaded model from pretrained
    model = model_type.from_pretrained(
        model_path,
        quantization_config=quant_config,
        device_map=device_map,
        **model_kwargs,
    )
    for k, v in model_config.items():
        setattr(model.config, k, v)

    # Load and configure the tokenizer
    tokenizer = tokenizer_type.from_pretrained(model_path, trust_remote_code=True)
    if not tokenizer_config:
        tokenizer.pad_token = tokenizer.eos_token
        tokenizer.padding_side = "right"
    else:
        for k, v in tokenizer_config.items():
            setattr(tokenizer, k, v)

    # Set up supervised fine-tuning trainer
    trainer = SFTTrainer(
        model=model,
        train_dataset=dataset,
        peft_config=ft_args.lora_config,
        tokenizer=tokenizer,
        args=ft_args.training_args,
        **ft_args.trainer_params,
    )

    # Run training
    trainer.train()

    # Save trained model
    new_model_path = volume / (model_path.split("/")[-1] + f"_{uuid4()}")
    trainer.model.save_pretrained(new_model_path)
    trainer.tokenizer.save_pretrained(new_model_path)

    return new_model_path

## Workflow that combines tasks

In [None]:
cpu_ex = cc.CloudExecutor(env=FT_ENV, num_cpus=12, memory="12GB", time_limit="4 hours")

@ct.lattice(executor=cpu_ex, workflow_executor=cpu_ex)
def finetune_workflow(
    model_id, data_path, llm_service,
    ft_args=None, device_map="auto", model_kwargs=None, ft_kwargs=None,
):

    """Run fine tuning, then deploy the fine tuned model."""

    model_kwargs = model_kwargs or {}
    ft_kwargs = ft_kwargs or {}
    ft_args = ft_args or FineTuneArguments()

    ft_model_path = fine_tune_model_peft(
        model_id, data_path, ft_args, AutoModelForCausalLM, AutoTokenizer, device_map, **ft_kwargs
    )
    service_info = llm_service(ft_model_path, AutoModelForCausalLM, AutoTokenizer, device_map)

    return service_info

---

# Service: Fine-tuned Model

<div align="center">
<img src="./assets/finetune-service.png" alt="Highlight fine-tuned model component" height=700px/>
</div>

In [None]:
ft_service_ex = cc.CloudExecutor(
    env=FT_ENV, num_cpus=25, num_gpus=1, gpu_type=GPU_TYPE.L40, memory="48GB", time_limit="10 days"
)

@cc.service(executor=ft_service_ex, volume=volume, name="Custom Fine-Tuned Model")
def finetuned_llm_service(
    ft_model_path,
    model_type=AutoModelForCausalLM, tokenizer_type=AutoTokenizer,
    device_map="auto", model_config=None, tokenizer_config=None,
    model_kwargs=None, pipeline_task="text-generation",
):
    """Serves a newly fine-tuned LLM for text generation."""

    ft_model_path_ = Path("/tmp") / Path(ft_model_path).name

    if ft_model_path_.exists():
        shutil.rmtree(ft_model_path_)

    shutil.copytree(ft_model_path, ft_model_path_)

    # Load and configure saved model
    model_kwargs = model_kwargs or {"do_sample": True}
    model = model_type.from_pretrained(ft_model_path_, device_map=device_map)
    if model_config:
        for k, v in model_config.items():
            setattr(model.config, k, v)

    # Load and configure tokenizer
    tokenizer = tokenizer_type.from_pretrained(ft_model_path_)
    if not tokenizer_config:
        tokenizer.pad_token = tokenizer.eos_token
        tokenizer.padding_side = "right"
    else:
        for k, v in tokenizer_config.items():
            setattr(tokenizer, k, v)

    pipe = pipeline(pipeline_task, model=model, tokenizer=tokenizer)

    return {"pipe": pipe, "model": model, "tokenizer": tokenizer}


@finetuned_llm_service.endpoint("/generate")
def generate_text(pipe, prompt, max_length=100):

    """Generate text from a prompt using the fine-tuned language model."""

    output = pipe(prompt, truncation=True, max_length=max_length, num_return_sequences=1)
    return output[0]["generated_text"]


@finetuned_llm_service.endpoint("/stream", streaming=True)
def generate_stream(model, tokenizer, prompt, prepend_prompt=False, max_tokens=100):

    """Prompt Llama-like model to stream generated text."""

    def _starts_with_space(_tokenizer, _token_id):
        token = _tokenizer.convert_ids_to_tokens(_token_id)
        return token.startswith('▁')

    _input = tokenizer(prompt, return_tensors='pt')
    _input = _input.to("cuda")

    if prepend_prompt:
        yield prompt

    for output_length in range(max_tokens):
        output = model.generate(**_input, max_new_tokens=1)
        current_token_id = output[0][-1]
        if current_token_id == tokenizer.eos_token_id:
            break

        current_token = tokenizer.decode(
            current_token_id, skip_special_tokens=True
        )
        if _starts_with_space(tokenizer, current_token_id.item()) and output_length > 1:
            current_token = ' ' + current_token
        yield current_token

        _input = {
            'input_ids': output.to("cuda"),
            'attention_mask': torch.ones(1, len(output[0])).to("cuda"),
        }

---

# Service: Main Agent

<div align="center">
<img src="./assets/main-agent.png" alt="Highlight main agent component" height=700px/>
</div>

In [None]:
agent_ex = cc.CloudExecutor(env=FT_ENV, num_cpus=12, memory="12GB", time_limit="4 hours")

@cc.service(executor=agent_ex, name="Fine Tuner Agent", auth=False, volume=volume)
def agent(lattice, llm_api):

    """Initialize the agent. Not much to do, just store the input params."""

    return {"finetune_lattice": lattice, "llm_api": llm_api}


@agent.endpoint("/submit", streaming=True)
def submit(
    finetune_lattice, llm_api,
    *,
    task="Generate synthetic movie reviews that either contain or spoiler or don't.",
    data_format="[item] ## [label]",
    num_generations=5,
    target_items_per_response=10,
    min_new_examples=2000,
    model_to_finetune="TinyLlama/TinyLlama-1.1B-Chat-v1.0",
):
    """Receives a task description, generates fine-tuning data,
    and dispatches the fine-tuning + deployment workflow."""

    yield "Generating fine-tuning data "

    iteration = 1
    new_examples = []
    while len(new_examples) < min_new_examples:

        texts = llm_api.generate_data( 
            task=task,
            return_format=data_format,
            num_generations=num_generations,
            target_items_per_response=target_items_per_response,
        )
        new_examples.extend(texts)

        yield "."
        iteration += 1

    yield f"\nGenerated {len(new_examples)} total examples.\n"

    dataset_save_path = volume / f"data_{len(new_examples)}-{uuid4()}"
    yield f"Saving dataset at {dataset_save_path!s}\n"
    dataset = Dataset.from_dict({"text": new_examples})
    dataset_save_path.mkdir(parents=True, exist_ok=True)
    dataset.save_to_disk(dataset_save_path)

    cc.save_api_key(CC_API_KEY)

    yield "Dispatching fine-tuning workflow ...\n"
    dispatch_id = cc.dispatch(finetune_lattice, volume=volume)(
        model_to_finetune, str(dataset_save_path), finetuned_llm_service
    )
    yield f"\nDispatch ID:\n{dispatch_id}\n"

---

# Workflow: Setting up The Zero-Data Foundry

In [None]:
setup_executor = cc.CloudExecutor(env=FT_ENV, num_cpus=12, memory="12GB", time_limit="4 hours")

@ct.lattice(executor=setup_executor, workflow_executor=setup_executor)
def setup_workflow(finetune_lattice, data_generator_model="unsloth/llama-3-8b-Instruct"):

    """Set up for everything."""

    data_generator_handle = llm_data_generator(data_generator_model)
    agent_handle = agent(finetune_lattice, data_generator_handle)
    return agent_handle

In [None]:
dispatch_id = cc.dispatch(setup_workflow, volume=volume)(
    finetune_lattice=finetune_workflow
)

print("Dispatch ID: ", dispatch_id)

res = cc.get_result(dispatch_id, wait=True)
res.result.load()
agent_creator = res.result.value

print("Function ID: ", agent_creator.function_id)

---

# Invoking the Agent API

## Example 1: Spoiler Detector

In [None]:
task = "Fine-tuning an LLM to detect whether or not a movie review contains a spoiler."
for k in agent_creator.submit(task=task, min_new_examples=9000):
    print(k.decode(), end="")

In [None]:
res = cc.get_result("9c6a462d-26b3-47d5-8134-10ea79ebc9bc", wait=True)
res.result.load()
spoiler_client = res.result.value
print(spoiler_client)

In [None]:
spoiler_client.generate(
    prompt="The Dark Knight is the most beloved film in Nolan's Batman trilogy ##"
)
# No spoiler.

In [None]:
spoiler_client.generate(prompt="I loved Twilight! I watch it three times a year. Would recommend this movie to people who enjoy staying indoors. ##").split("##")[-1].strip()
# No spoiler.

In [None]:
spoiler_client.generate(prompt="Soylent green is people! ##").split("##")[-1].strip()
# NOTE: Has a spoiler!

In [None]:
spoiler_client.generate(prompt="The Planet of The Apes was Earth all along. ##").split("##")[-1].strip()
# NOTE: Has a spoiler!

## Example 2: Grammar Corrector

In [None]:
task = "Examples of bot responses that correct grammatical errors."
data_format = '"<|user|>{input_sentence}</s><|assistant|>{corrected_sentence}"'
for k in agent_creator.submit(task=task, data_format=data_format):
    print(k.decode(), end="")

In [None]:
res = cc.get_result("a822ca62-2a14-4288-ab38-77690c03729b", wait=True)
res.result.load()
grammar_client = res.result.value
print(grammar_client)

In [None]:
prompt = "<|user|>{}</s><|assistant|>"
def correct_grammar(sentence):
    prompt_ = prompt.format(sentence)
    response = grammar_client.generate(prompt=prompt_).split("<|assistant|>")[-1].strip()
    print(response)

correct_grammar("I should of never got a pet.")  # should've
correct_grammar("Jerry and me argued about it.")  # Jerry and I
correct_grammar("He said, 'No cat bites it's own tail.'")  # its
correct_grammar("But if I had to choose, Id rather get a dog then a cat.")  # I'd, than
correct_grammar("All dogs bite they're own tails.")  # dogs, their, tails

## Example 3: Emoji translation

In [None]:
task = "Fine-tuning an LLM to translate a sentence without any emojis into a string of only emojis with roughly the same meaning."
for k in agent_creator.submit(task=task, data_format="[sentence] | [matching emoji string]"):
    print(k.decode(), end="")

In [None]:
res = cc.get_result("bea6dcb8-2afd-4f9a-8390-595cf78522cc", wait=True)
res.result.load()
emoji_client = res.result.value
print(emoji_client)

In [None]:
emoji_client.generate(prompt="Dancing with my cat | ")

In [None]:
emoji_client.generate(prompt="Got a brand new car | ")

In [None]:
emoji_client.generate(prompt="Paint me a scenic mountain, Mr. Ross | ")

In [None]:
emoji_client.generate(prompt="Let's eat! | ")