# Translation pipeline with OpenAI Batch API 

## Libraries

In [10]:
import os, sys

sys.path.append(os.path.join(os.getcwd(), ".."))

In [11]:
import json
import asyncio
from pathlib import Path
from datetime import datetime
from typing import Any, Optional, Union, List, Dict

import tooldantic as td
from tooldantic import ModelBuilder, OpenAiResponseFormatBaseModel
from openai import AsyncOpenAI

## Translation pipeline

### Remove unnecessary data

In [3]:
# paths = [
#     "../data/english/ApolloCorpus", "../data/english/MashQA", 
#     "../data/english/MediQA_Task3", "../data/english/MedQuad", "../data/english/ViHealthQA"
# ]
# files = [file for dir in paths for file in Path(dir).rglob("*") if file.suffix in (".json", ".jsonl")]

# for path in files:
#     # read files
#     with open(path, "r", encoding="utf-8") as f:
#         data = json.load(f)

#     if not data[0].get("id"):
#         continue
    
#     # delete id
#     print(path)
#     for d in data:
#         del d["id"]

#     # write again
#     with open(path, "w", encoding="utf-8") as f:
#         json.dump(data, f, ensure_ascii=False, indent=4)

### Translate

**Prompt**

In [4]:
SYSTEM_PROMPT = """\
You are the best medical translator in the world. Your task is to translate medical documents from English to Vietnamese. \
The translation must be clear, concise, and easy to understand, while preserving the original meaning and tone.

Input will always be in JSON format, but the structure may vary. Your job is to translate only the values while retaining \
the keys and the structure of the input JSON exactly as they are. 
The output must strictly follow the same format as the input JSON. Provide only the translated JSON in the output, with no \
additional comments, explanations, or text.

Examples 1:
- Input:
```json
{
    "diagnosis": "Hypertension",
    "prescription": "Take 1 tablet of Lisinopril 10mg daily.",
    "notes": "Monitor blood pressure regularly."
}
- Output:
```json
{
    "diagnosis": "Tăng huyết áp",
    "prescription": "Uống 1 viên Lisinopril 10mg mỗi ngày.",
    "notes": "Theo dõi huyết áp thường xuyên."
}
```

Example 2:
- Input:
```json
[
    {
        "doctor": "Dr. John Doe",
        "appointment": "Annual physical exam",
        "recommendations": "Patient should undergo a cholesterol test."
    }
]
```
- Output:
```json
[
    {
        "doctor": "Bác sĩ John Doe",
        "appointment": "Kiểm tra sức khỏe hàng năm",
        "recommendations": "Bệnh nhân nên thực hiện xét nghiệm cholesterol."
    }
]
```
"""

**Create batching API**

In [6]:
BATCH_SIZE = 16
MAX_REQUEST_PER_FILE = 50000
MAX_SIZE_PER_FILE = 200

In [7]:
def create_batch_data(data: List[Dict], batch_size: int = BATCH_SIZE) -> List[List[Dict]]:
    return [data[i : i + batch_size] for i in range(0, len(data), batch_size)]


def create_batch_model(
    data: Union[List[Dict], Dict],
    model_name: str = "Translation",
    model_desc: str = """Translated output into Vietnamese"""
):
    example_data = data[0] if isinstance(data, list) else data
    return ModelBuilder(base_model=OpenAiResponseFormatBaseModel).model_from_dict(
        example_data, model_name=model_name, model_description=model_desc
    )


def get_object_size(batch: List[Dict]) -> int:
    return len(json.dumps(batch).encode("utf-8"))


def prepare_request(
    id: str,
    batch_data: List[Dict],
    model: str,
    system_prompt: str,
) -> List[str]:
    batch_model = create_batch_model(batch_data)
    class Translations(OpenAiResponseFormatBaseModel):
        """A list of translated output into Vietnamese"""
        items: List[batch_model]

    return {
        "custom_id": id,
        "method": "POST",
        "url": "/v1/chat/completions",
        "body": {
            "model": model,
            "messages": [
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": str(batch_data)}
            ],
            "temperature": 0.5,
            "response_format": Translations.model_json_schema()
        }
    }


def prepare_batch_file(
    dataset_path: Union[str, Path],
    output_dir: str,
    max_requests: int = MAX_REQUEST_PER_FILE,
    max_file_size_mb: int = MAX_SIZE_PER_FILE,
    batch_size: int = BATCH_SIZE,
    model: str = "gpt-4o-mini",
    system_prompt: str = SYSTEM_PROMPT
):
    # check file existence
    if not os.path.isfile(dataset_path):
        raise FileNotFoundError(f"Input file not found: {dataset_path}")
    
    # create output dir
    os.makedirs(output_dir, exist_ok=True)

    # load data
    with open(dataset_path, "r", encoding="utf-8") as f:
        data = json.load(f)
    
    data = data if isinstance(data, list) else [data]

    # split data into batches
    batches = create_batch_data(data, batch_size=batch_size)

    max_file_size_bytes = max_file_size_mb * 1024 * 1024

    # create batch files
    path = Path(dataset_path) if isinstance(dataset_path, str) else dataset_path
    res_batch_files = []
    current_batch_file = []
    current_size = 0
    current_request_idx = 0
    current_file_idx = 0
    current_file_name = f"{path.stem}-{current_file_idx}"
    for batch in batches:
        request = prepare_request(
            id=f"{current_file_name}_request-{current_request_idx}",
            batch_data=batch,
            model=model,
            system_prompt=system_prompt
        )
        request_bytes = get_object_size(request)

        # check size of current file
        if current_size + request_bytes > max_file_size_bytes or len(current_batch_file) + 1 > max_requests:
            # write it down
            batch_file_path = os.path.join(output_dir, f"{current_file_name}.jsonl")
            with open(batch_file_path, "w", encoding="utf-8") as file:
                for line in current_batch_file:
                    file.write(json.dumps(line, ensure_ascii=False) + "\n")

            res_batch_files.append(batch_file_path)
            
            # update counter
            current_batch_file = []
            current_size = 0
            current_file_idx += 1
            current_request_idx = 0
            current_file_name = f"{path.stem}-{current_file_idx}"
            
            # update current request
            request["custom_id"] = f"{current_file_name}_request-{current_request_idx}"
            request_bytes = get_object_size(request)

        # add batch to batch file
        current_batch_file.append(request)
        current_size += request_bytes 
        current_request_idx += 1

    # write remaining batch
    if current_batch_file:
        if current_file_idx == 0:
            current_file_name = path.stem
        batch_file_path = os.path.join(output_dir, f"{current_file_name}.jsonl")
        with open(batch_file_path, "w", encoding="utf-8") as file:
            for line in current_batch_file:
                file.write(json.dumps(line, ensure_ascii=False) + "\n")

        res_batch_files.append(batch_file_path)

    return res_batch_files


In [8]:
# this code is used for testing
# batch_files = [
#     {
#         "dataset_path": Path("../data/translation_samples/input.json"),
#         "batch_files": prepare_batch_file(
#             Path("../data/translation_samples/input.json"),
#             output_dir="../data/batch_files/translation_samples",
#             max_requests=1,
#             batch_size=1
#         )
#     }
# ]
# ------------------------------------------------------------------

data_dir = "../data/english"
output_dir = "../batch_files"
dataset_dirs = [Path(os.path.join(data_dir, sub_dir)) for sub_dir in os.listdir(data_dir)]

batch_files = []
for dir in dataset_dirs:
    datasets = [file for file in Path(dir).rglob("*") if file.suffix in [".json", ".jsonl"]]
    
    for ds_path in datasets:
        batch_files.append(
            {
                "dataset_path": ds_path,
                "batch_files": prepare_batch_file(
                    ds_path,
                    output_dir=os.path.join("../data/batch_files", ds_path.parent.name)
                )
            }
        )

In [None]:
async def process_batch_file(file: str, client: AsyncOpenAI = AsyncOpenAI()):
    # upload files
    input_file = await client.files.create(
        file=open(file, "rb"),
        purpose="batch"
    )

    # create batch
    response = await client.batches.create(
        input_file_id=input_file.id,
        endpoint="/v1/chat/completions",
        completion_window="24h"
    )

    return response.id


async def process_dataset(item, client: AsyncOpenAI = AsyncOpenAI()):
    dirname: Path = item["dataset_path"]
    batch_ids = await asyncio.gather(*[process_batch_file(file) for file in item["batch_files"]])
    return {
        "dataset": f"{dirname.parent.name}/{dirname.name}",
        "batch_ids": batch_ids
    }


client = AsyncOpenAI()
dir_to_batch_ids = await asyncio.gather(*[process_dataset(item) for item in batch_files])

In [10]:
with open("../data/batch_ids.json", "w", encoding="utf-8") as f:
    json.dump(dir_to_batch_ids, f, ensure_ascii=False, indent=4)

In [None]:
raise SystemExit("Stop right there!\nYou have to run the rest manually")

### Handle over enqueued token limit

In [12]:
with open("../data/batch_ids.json", "r", encoding="utf") as file:
    batch_ids = json.load(file)

In [13]:
total_batch_ids = [id for item in batch_ids for id in item["batch_ids"]]

In [14]:
client = AsyncOpenAI()
batch_reponses = await asyncio.gather(*[
    client.batches.retrieve(id) for id in total_batch_ids
])
batch_input_file_ids = [response.input_file_id for response in batch_reponses if response.errors and response.errors.data[0].code == "token_limit_exceeded"]

INFO:httpx:HTTP Request: GET https://api.openai.com/v1/batches/batch_67752ab889c88190a02f8f372cce7994 "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: GET https://api.openai.com/v1/batches/batch_677529c935cc8190968763d5e4d3af17 "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: GET https://api.openai.com/v1/batches/batch_677529bb9958819081b9556ee39c1c90 "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: GET https://api.openai.com/v1/batches/batch_67752a833f088190839baf9ef1e31e71 "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: GET https://api.openai.com/v1/batches/batch_677529e736548190928b739e20129b8d "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: GET https://api.openai.com/v1/batches/batch_677529be7ba08190bb49d33884570070 "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: GET https://api.openai.com/v1/batches/batch_677529b00be88190be98e384c9012a49 "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: GET https://api.openai.com/v1/batches/batch_677529b9b91881909370367a2e9a675f "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: GET https://api

INFO:httpx:HTTP Request: POST https://api.openai.com/v1/batches "HTTP/1.1 400 Bad Request"


BadRequestError: Error code: 400 - {'error': {'message': 'Billing hard limit has been reached', 'type': 'invalid_request_error', 'param': None, 'code': 'billing_hard_limit_reached'}}

INFO:httpx:HTTP Request: POST https://api.openai.com/v1/batches "HTTP/1.1 400 Bad Request"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/batches "HTTP/1.1 400 Bad Request"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/batches "HTTP/1.1 400 Bad Request"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/batches "HTTP/1.1 400 Bad Request"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/batches "HTTP/1.1 400 Bad Request"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/batches "HTTP/1.1 400 Bad Request"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/batches "HTTP/1.1 400 Bad Request"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/batches "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/batches "HTTP/1.1 400 Bad Request"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/batches "HTTP/1.1 400 Bad Request"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/batches "HTTP/1.1 400 Bad Request"
INFO:htt