## Pre-requisites
- Setup a conda venv such that the python version matches that of the ray cluster conda create -n rayvenv python=3.9.18
- Activate the conda venv using conda activate rayvenv
- Install jupyter using pip install jupyter
- Run the steps in [1_Unstructued_Data_Preparation.ipynb notebook](./1_Unstructued_Data_Preparation.ipynb) to download dataset and push to S3
- Ensure that the following are set as env vars
    - 'AWS_ACCESS_KEY_ID'
    - 'AWS_SECRET_ACCESS_KEY'
    - 'HUGGINGFACE_API_TOKEN'
    - 'OPENAI_API_KEY'

## Environment Setup

In [35]:
%pip install llama-index-llms-openai llama-index-embeddings-openai llama-index-finetuning ray['client'] boto3 -q

[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
llama-index-readers-s3 0.1.5 requires s3fs<2025.0.0,>=2024.3.0, but you have s3fs 0.4.2 which is incompatible.[0m[31m
[0mNote: you may need to restart the kernel to use updated packages.


### Init Ray Client

In [None]:
import os
import ray

# os.environ['OPENAI_API_KEY'] = 'sk_...'
os.environ['RAY_ADDRESS'] = 'ray://localhost:10001' # kubectl -n raycluster port-forward svc/raycluster-kuberay-head-svc 10001

ray.shutdown() # precautionary :)
runtime_env = {
    'pip': [
        'llama-index==0.10.27',
        'llama-index-finetuning==0.1.5',
        'boto3==1.34.79',
        'botocore==1.34.79',
        'ipython==8.18.1',
        'pandas==2.2.1',
        'ragas==0.1.7',
        'pypdf2==3.0.1',
        'boto3==1.34.79',
        'langchain==0.1.14',
        'unstructured==0.13.2'
        
    ],
    "env_vars": {
        'AWS_ACCESS_KEY_ID': os.environ['AWS_ACCESS_KEY_ID'],
        'AWS_SECRET_ACCESS_KEY': os.environ['AWS_SECRET_ACCESS_KEY'],
        'HUGGINGFACE_API_TOKEN': os.environ['HUGGINGFACE_API_TOKEN'],
        'OPENAI_API_KEY': os.environ['OPENAI_API_KEY'],
    }

}

ray.init(runtime_env=runtime_env, include_dashboard=True, log_to_driver=False)

2024-05-14 05:25:33,160	INFO worker.py:1432 -- Using address ray://localhost:10001 set in the environment variable RAY_ADDRESS
2024-05-14 05:25:33,163	INFO client_builder.py:244 -- Passing the following kwargs to ray.init() on the server: include_dashboard, log_to_driver


## Create Finetuning Dataset 

### Re-usable helper functions and variables

In [15]:
import boto3
import os

s3 = boto3.client(
        's3',
        endpoint_url='http://localhost:9000',
        aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'],
        aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY']
)

BUCKET_NAME = 'unstructured-data'

def list_files_in_bucket(bucket_name):
    file_paths = []
    paginator = s3.get_paginator('list_objects_v2')
    page_iterator = paginator.paginate(Bucket=bucket_name)

    for page in page_iterator:
        if "Contents" in page:
            for obj in page['Contents']:
                if obj['Key'].lower().endswith('.pdf'):  # Check if the file is a PDF
                    file_paths.append(obj['Key'])

    return file_paths


def upload_directory_to_minio(bucket_name, directory_path, endpoint_url, access_key, secret_key, file_extension='.pdf'):
    # Create a boto3 session
    session = boto3.session.Session()

    # Create an S3 client configured for MinIO
    s3_client = session.client(
        service_name='s3',
        aws_access_key_id=access_key,
        aws_secret_access_key=secret_key,
        endpoint_url=endpoint_url,
        region_name='us-east-1',  # This can be any string
        config=boto3.session.Config(signature_version='s3v4')
    )

    # Ensure bucket exists (create if not)
    try:
        if s3_client.head_bucket(Bucket=bucket_name):
            print(f"Bucket {bucket_name} already exists.")
    except:
        s3_client.create_bucket(Bucket=bucket_name)
        print(f"Bucket {bucket_name} created.")

    # Upload each PDF in the directory
    for root, dirs, files in os.walk(directory_path):
        for file in files:
            if file.lower().endswith(file_extension):
                file_path = os.path.join(root, file)
                object_name = os.path.relpath(file_path, directory_path).replace("\\", "/")  # Ensure proper path format
                try:
                    s3_client.upload_file(file_path, bucket_name, object_name)
                    print(f"Uploaded {file_path} as {object_name}")
                except Exception as e:
                    print(f"Failed to upload {file_path}: {e}")

In [14]:
list_files_in_bucket(BUCKET_NAME)

['Acute respiratory distress syndrome in an alpaca cria.pdf',
 'Alpaca liveweight variations and fiber production in Mediterranean range of Chile.pdf',
 'Antibody response to the epsilon toxin ofClostridium perfringensfollowing vaccination of Lama glamacrias.pdf',
 'Comparative pigmentation of sheep, goats, and llamas what colors are possible through selection.pdf',
 'Conservative management of a ruptured.pdf',
 'Evaluation of cholesterol and vitamin E concentrations in adult alpacas and nursing crias.pdf',
 'Influence of Follicular Fluid on in Vitro.pdf',
 'Influence of effects on quality traits and relationships between traits of the llama fleece..pdf',
 'Neurological Causes of Diaphragmatic Paralysis in 11 Alpacas.pdf',
 'On the morphology of the cerebellum of the alpaca (Lama pacos)..pdf',
 'Relationships between integumental characteristics and.pdf',
 'Respiratory mechanics and results of cytologic examination of bronchoalveolar lavage fluid in healthy adult alpacas.pdf',
 'Serum 

### Generate QA Pairs in a distributed manner

In [21]:
import os
import boto3
import fitz  # PyMuPDF
import shutil
import time
import random
from llama_index.core import SimpleDirectoryReader
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.schema import MetadataMode
from ragas.testset.generator import TestsetGenerator
from llama_index.llms.openai import OpenAI
from llama_index.finetuning import generate_qa_embedding_pairs
from llama_index.core.evaluation import EmbeddingQAFinetuneDataset

@ray.remote
def generate_finetuning_dataset(bucket_name, file_names):

    DIRECTORY_NAME = os.path.join(os.getcwd(), 'data')
    s3_client = boto3.client(
        's3',
        aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'],
        aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY'],
        endpoint_url='http://minio.minio.svc:9000'
    )
    
    if not os.path.exists(DIRECTORY_NAME):
        os.makedirs(DIRECTORY_NAME)
    
    for file_name in file_names:
        LOCAL_FILE_PATH = os.path.join(DIRECTORY_NAME, file_name)
        
        # Fetch the PDF file from S3
        pdf_file = s3_client.get_object(Bucket=bucket_name, Key=file_name)
        pdf_content = pdf_file['Body'].read()
        
        with open(LOCAL_FILE_PATH, 'wb') as f:
            f.write(pdf_content)

    reader = SimpleDirectoryReader(input_dir=DIRECTORY_NAME)
    documents = reader.load_data()

    parser = SentenceSplitter()
    nodes = parser.get_nodes_from_documents(documents, show_progress=True)

    dataset = generate_qa_embedding_pairs(
        llm=OpenAI(model="gpt-3.5-turbo"),
        nodes=nodes
    )

    return dataset

In [22]:
pdfs_on_s3 = list_files_in_bucket(BUCKET_NAME)
futures = []

# Process up to 3 PDFs at a time
for i in range(0, len(pdfs_on_s3), 3):
    batch = pdfs_on_s3[i:i+3]
    future = generate_finetuning_dataset.remote(BUCKET_NAME, batch)
    futures.append(future)

combined_datasets = ray.get(futures)

### Collating datasets into a single json and checkpointing them in S3 for later

In [32]:
# create dir
finetuning_dir = './data/finetuning'
try:
    os.makedirs(finetuning_dir)
    print(f"Directory '{finetuning_dir}' created successfully")
except FileExistsError:
    print(f"Directory '{finetuning_dir}' already exists")
except Exception as e:
    print(f"An error occurred: {e}")


# loop over datasets to convert to json
for i in range(len(combined_datasets)):
    dataset = combined_datasets[i]
    dataset.save_json(os.path.join(finetuning_dir, str(i)+'.json'))

# combine the jsons
# save as single json file

Directory './data/finetuning' created successfully


In [49]:
import os
import json

def combine_json_files(input_directory, output_file):
    combined_data = {
        "mode": None,
        "queries": {},
        "corpus": {},
        "relevant_docs": {}
    }

    # Iterate through all files in the directory
    for filename in os.listdir(input_directory):
        if filename.endswith('.json'):
            file_path = os.path.join(input_directory, filename)
            try:
                with open(file_path, 'r') as f:
                    data = json.load(f)
                    
                    # Check and set the mode if not already set
                    if combined_data["mode"] is None:
                        combined_data["mode"] = data["mode"]
                    elif combined_data["mode"] != data["mode"]:
                        raise ValueError(f"Mode mismatch: {combined_data['mode']} != {data['mode']} in file {file_path}")

                    # Merge queries
                    combined_data["queries"].update(data["queries"])

                    # Merge corpus
                    combined_data["corpus"].update(data["corpus"])

                    # Merge relevant_docs
                    for key, value in data["relevant_docs"].items():
                        if key in combined_data["relevant_docs"]:
                            combined_data["relevant_docs"][key].extend(value)
                        else:
                            combined_data["relevant_docs"][key] = value
            except json.JSONDecodeError as e:
                print(f"Error decoding JSON from file {file_path}: {e}")
            except Exception as e:
                print(f"An error occurred while reading file {file_path}: {e}")

    # Write combined data to the output file
    try:
        with open(output_file, 'w') as f:
            json.dump(combined_data, f, indent=4)
        print(f"Combined JSON saved to {output_file}")
    except Exception as e:
        print(f"An error occurred while writing to file {output_file}: {e}")

# Example usage
# combine_json_files('path/to/input_directory', 'output_file.json')



combined_dataset_json = 'finetuning_dataset.json'
combine_json_files(finetuning_dir, os.path.join(finetuning_dir, combined_dataset_json))

Combined JSON saved to ./data/finetuning/finetuning_dataset.json


In [50]:
endpoint_url = 'http://localhost:9000'  # Example: 'http://127.0.0.1:9000'
access_key = os.environ['AWS_ACCESS_KEY_ID']
secret_key = os.environ['AWS_SECRET_ACCESS_KEY']

upload_directory_to_minio(BUCKET_NAME, finetuning_dir, endpoint_url, access_key, secret_key, '.json')

Bucket unstructured-data already exists.
Uploaded ./data/finetuning/0.json as 0.json
Uploaded ./data/finetuning/2.json as 2.json
Uploaded ./data/finetuning/4.json as 4.json
Uploaded ./data/finetuning/1.json as 1.json
Uploaded ./data/finetuning/3.json as 3.json
Uploaded ./data/finetuning/finetuning_dataset.json as finetuning_dataset.json
Uploaded ./data/finetuning/.ipynb_checkpoints/0-checkpoint.json as .ipynb_checkpoints/0-checkpoint.json
Uploaded ./data/finetuning/.ipynb_checkpoints/3-checkpoint.json as .ipynb_checkpoints/3-checkpoint.json
Uploaded ./data/finetuning/.ipynb_checkpoints/finetuning_dataset-checkpoint.json as .ipynb_checkpoints/finetuning_dataset-checkpoint.json


![](assets/minio_finetuning_dataset.png)

## Distributed Finetuning

### Downloading SentenceTransformer embedding model from HuggingFace

In [3]:
from sentence_transformers import SentenceTransformer

# Specify the model name
model_name = "all-MiniLM-L6-v2"

# Load the model
model = SentenceTransformer(model_name)

# Save model locally
model.save("./work_dir")

### Simple finetuning without Ray Train

In [6]:
from llama_index.finetuning import SentenceTransformersFinetuneEngine
from llama_index.core.evaluation import EmbeddingQAFinetuneDataset

train_dataset = EmbeddingQAFinetuneDataset.from_json('./data/finetuning/finetuning_dataset.json')
finetune_engine = SentenceTransformersFinetuneEngine(
    train_dataset,
    model_id='./work_dir',
    model_output_path="test_model",
    batch_size=32
)

finetune_engine.finetune()

Epoch:   0%|          | 0/2 [00:00<?, ?it/s]

Iteration:   0%|          | 0/39 [00:00<?, ?it/s]

Iteration:   0%|          | 0/39 [00:00<?, ?it/s]

### Distributed Finetuning with Ray Train

#### Initialise Ray Client (again...we need to mount a work_dir)

In [1]:
import os
import ray


os.environ['RAY_ADDRESS'] = 'ray://localhost:10001' # kubectl -n raycluster port-forward svc/raycluster-kuberay-head-svc 10001
os.environ["RAY_CHDIR_TO_TRIAL_DIR"] = "0"

ray.shutdown() # precautionary :)
runtime_env = {
    'pip': [
        'llama-index==0.10.27',
        'llama-index-finetuning==0.1.5',
        'boto3==1.34.79',
        'botocore==1.34.79',
        'ipython==8.18.1',
        'pandas==2.2.1',
        'ragas==0.1.7',
        'pypdf2==3.0.1',
        'boto3==1.34.79',
        'langchain==0.1.14',
        'unstructured==0.13.2'
        
    ],
    'env_vars': {
        'AWS_ACCESS_KEY_ID': os.environ['AWS_ACCESS_KEY_ID'],
        'AWS_SECRET_ACCESS_KEY': os.environ['AWS_SECRET_ACCESS_KEY'],
        'HUGGINGFACE_API_TOKEN': os.environ['HUGGINGFACE_API_TOKEN'],
        'OPENAI_API_KEY': os.environ['OPENAI_API_KEY'],
    },
    'work_dir': './work_dir'
    

}

ray.init(runtime_env=runtime_env, include_dashboard=True, log_to_driver=False)

2024-05-14 15:54:22,856	INFO worker.py:1432 -- Using address ray://localhost:10001 set in the environment variable RAY_ADDRESS
2024-05-14 15:54:22,886	INFO client_builder.py:244 -- Passing the following kwargs to ray.init() on the server: include_dashboard, log_to_driver


0,1
Python version:,3.9.18
Ray version:,2.10.0
Dashboard:,http://10.10.255.227:8265


#### Helper Functions

In [16]:
import os
import boto3
import numpy as np
from datasets import load_dataset
from transformers import (
    Trainer,
    TrainingArguments,
    AutoTokenizer,
    AutoModelForSequenceClassification,
)

import ray
from ray.train import ScalingConfig
from ray.train.huggingface.transformers import RayTrainReportCallback

def load_my_dataset(data_file, use_all_docs=True):
    with open(data_file, 'r') as f:
        dataset = json.load(f)

    examples = []
    for query_id, query in dataset['queries'].items():
        if use_all_docs:
            for node_id in dataset['relevant_docs'][query_id]:
                text = dataset['corpus'][node_id]
                example = InputExample(texts=[query, text])
                examples.append(example)
        else:
            node_id = dataset['relevant_docs'][query_id][0]
            text = dataset['corpus'][node_id]
            example = InputExample(texts=[query, text])
            examples.append(example)

    return examples

def tokenize_function(examples, tokenizer):
    return tokenizer(examples["text"], padding="max_length", truncation=True)

def upload_to_s3(local_dir, bucket_name, s3_dir):
    s3_client = boto3.client(
        's3',
        aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'],
        aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY'],
        endpoint_url='http://minio.minio.svc:9000'
    )
    for root, dirs, files in os.walk(local_dir):
        for file in files:
            local_path = os.path.join(root, file)
            relative_path = os.path.relpath(local_path, local_dir)
            s3_path = os.path.join(s3_dir, relative_path)
            s3.upload_file(local_path, bucket_name, s3_path)
            print(f"Uploaded {local_path} to s3://{bucket_name}/{s3_path}")

In [21]:
def train_func():
    # Define the configuration for the training
    config = {
        "batch_size": 32,
        "num_epochs": 1,
        "s3_bucket_name": 'unstructured-data',  # Replace with your S3 bucket name
        "s3_checkpoint_dir": "finetuning_checkpoints"
    }
    
    # Load dataset
    dataset = load_finetuning_dataset()
    tokenizer = AutoTokenizer.from_pretrained("sentence-transformers/all-MiniLM-L6-v2")

    # Tokenize dataset
    tokenized_train_dataset = dataset["train"].map(
        lambda x: tokenize_function(x, tokenizer), batched=True
    )

    # Load model
    model = AutoModelForSequenceClassification.from_pretrained(
        "sentence-transformers/all-MiniLM-L6-v2", num_labels=2  # Adjust num_labels as needed
    )

    # Define training arguments
    training_args = TrainingArguments(
        output_dir="./results",
        evaluation_strategy="no",  # No evaluation during training
        save_strategy="epoch",
        learning_rate=2e-5,
        per_device_train_batch_size=config["batch_size"],
        per_device_eval_batch_size=config["batch_size"],
        num_train_epochs=config["num_epochs"],
        weight_decay=0.01,
        report_to="none",
    )

    # Create Trainer
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=tokenized_train_dataset,
    )

    # Add RayTrainReportCallback to the Trainer
    callback = RayTrainReportCallback()
    trainer.add_callback(callback)

    # Prepare Trainer for distributed training
    trainer = ray.train.huggingface.transformers.prepare_trainer(trainer)

    # Start training
    trainer.train()

    # Upload checkpoints to S3 after training
    local_checkpoint_dir = training_args.output_dir
    s3_bucket_name = config["s3_bucket_name"]
    s3_checkpoint_dir = config["s3_checkpoint_dir"]
    upload_to_s3(local_checkpoint_dir, s3_bucket_name, s3_checkpoint_dir)


# Create a Ray TorchTrainer and execute the training function
trainer = TorchTrainer(
    train_func,
    scaling_config=ScalingConfig(num_workers=2, use_gpu=False),
)

# Run the trainer
result = trainer.fit()

# Load the trained model
with result.checkpoint.as_directory() as checkpoint_dir:
    checkpoint_path = os.path.join(
        checkpoint_dir,
        RayTrainReportCallback.CHECKPOINT_NAME,
    )
    model = AutoModelForSequenceClassification.from_pretrained(checkpoint_path)

TypeError: cannot pickle '_thread.lock' object

## Evaluate Finetuned Model

## Log Evaluation Results to MLFlow