# 🗂️ Batch Processing with TextTools

Welcome! 🎉 This notebook demonstrates how to use the **Batch Manager** in `texttools`.  

The batch manager lets you run large-scale LLM jobs efficiently by splitting data into manageable chunks, processing them in parallel, and saving results safely.

You’ll learn:
- How to configure a batch job
- How to load and partition input data
- How to run, check, and fetch results

## 🔧 Best Practices for Setup

In [None]:
##################### Best practice for connecting without error ####################

# 1- Use a proxy
# 2- Run the code on VPS

# The first option is better, the data will be locally saved if anything went wrong

## 1. Install & Setup
First, make sure you have `texttools` installed:
```
pip install -U hamta-texttools
```

Then set your OpenAI (or OpenRouter) API key in a `.env` file:
```
OPENAI_API_KEY=your_api_key_here
```

## 2. Import Required Libraries
We’ll use `dotenv` for environment variables, `pydantic` for schema validation, and `texttools`’ batch manager.

In [None]:
import json
import os
import time
from pathlib import Path
from typing import Any

from dotenv import load_dotenv
from openai import OpenAI
from pydantic import BaseModel

from texttools.batch_manager import SimpleBatchManager

## 3. Batch Configuration
Here we define limits such as batch size and token budgets.

In [None]:
class BatchConfig:
    MAX_BATCH_SIZE = 1000
    MAX_TOTAL_TOKENS = 2000000
    CHARS_PER_TOKEN = 2.7
    PROMPT_TOKEN_MULTIPLIER = 1000
    BASE_OUTPUT_DIR = "batch_results"

## 4. Helper Functions
We provide utilities for formatting input data and parsing model outputs.

In [None]:
def data_for_batch(data: list[dict[str, Any]]) -> list[dict[str, Any]]:
    """
    Converts raw data to batch format: [{"id": int, "content": str}, ...]
    """
    result = []
    for idx, item in enumerate(data):
        if isinstance(item, dict) and "content" in item:
            result.append({"id": item.get("id", idx), "content": item["content"]})
        elif isinstance(item, str):
            result.append({"id": idx, "content": item})
        else:
            raise ValueError(f"Invalid data item at index {idx}: {item}")
    return result


def parsing_output(
    part_idx: int, output_data: list[dict[str, Any]]
) -> list[dict[str, Any]]:
    """
    Processes the output from the batch manager. Modify as needed.
    """
    return output_data

## 5. Define Output Schema
We use a Pydantic model to enforce structured outputs.

In [None]:
class OutputData(BaseModel):
    output: str

## 6. Batch Job Runner
This class manages the full lifecycle of a batch job:  
- Loading and partitioning data  
- Starting and monitoring jobs  
- Fetching and saving results  

In [None]:
class BatchJobRunner:
    def __init__(
        self,
        system_prompt: str,
        job_name: str,
        input_data_path: str,
        output_data_path: str,
        model: str = "gpt-4.1-mini",
        output_model=OutputData,
    ):
        self.config = BatchConfig()
        self.system_prompt = system_prompt
        self.job_name = job_name
        self.input_data_path = input_data_path
        self.output_data_path = output_data_path
        self.model = model
        self.output_model = output_model
        self.manager = self._init_manager()
        self.data: list[dict[str, Any]] = []
        self.parts: list[list[dict[str, Any]]] = []
        self._load_data()
        self._partition_data()
        Path(self.config.BASE_OUTPUT_DIR).mkdir(parents=True, exist_ok=True)

    def _init_manager(self) -> SimpleBatchManager:
        load_dotenv()
        api_key = os.getenv("OPENAI_API_KEY")
        client = OpenAI(api_key=api_key)
        return SimpleBatchManager(
            client=client,
            model=self.model,
            prompt_template=self.system_prompt,
            output_model=self.output_model,
        )

    def _load_data(self):
        with open(self.input_data_path, "r", encoding="utf-8") as f:
            data = json.load(f)
        self.data = data_for_batch(data)

    def _partition_data(self):
        total_length = sum(len(item["content"]) for item in self.data)
        prompt_length = len(self.system_prompt)
        total = total_length + (prompt_length * len(self.data))
        calculation = total / self.config.CHARS_PER_TOKEN
        print(
            f"Total chars: {total_length}, Prompt chars: {prompt_length}, Total: {total}, Tokens: {calculation}"
        )
        if calculation < self.config.MAX_TOTAL_TOKENS:
            self.parts = [self.data]
        else:
            # Partition into chunks of MAX_BATCH_SIZE
            self.parts = [
                self.data[i : i + self.config.MAX_BATCH_SIZE]
                for i in range(0, len(self.data), self.config.MAX_BATCH_SIZE)
            ]
        print(f"Data split into {len(self.parts)} part(s)")

    def run(self):
        for idx, part in enumerate(self.parts):
            part_job_name = (
                f"{self.job_name}_part_{idx + 1}"
                if len(self.parts) > 1
                else self.job_name
            )
            print(
                f"\n--- Processing part {idx + 1}/{len(self.parts)}: {part_job_name} ---"
            )
            self._process_part(part, part_job_name, idx)

    def _process_part(
        self, part: list[dict[str, Any]], part_job_name: str, part_idx: int
    ):
        while True:
            command = (
                input("Enter command (1.start, 2.check, 3.fetch): ").strip().lower()
            )
            if command in ["1", "start"]:
                self.manager.start(part, job_name=part_job_name)
                print("Started batch job.")
                time.sleep(1)
            elif command in ["2", "check"]:
                status = self.manager.check_status(job_name=part_job_name)
                print(f"Status: {status}")
                time.sleep(5)
                if status == "completed":
                    print("Job completed. You can now fetch results.")
                elif status == "failed":
                    print("Job failed. Clearing state.")
                    self.manager._clear_state(part_job_name)
            elif command in ["3", "fetch"]:
                output_data, log = self.manager.fetch_results(
                    job_name=part_job_name, save=True, remove_cache=False
                )
                output_data = parsing_output(part_idx, output_data)
                self._save_results(output_data, log, part_idx)
                print("Fetched and saved results for this part.")
                break
            else:
                print("Invalid command. Please enter 1, 2, or 3.")

    def _save_results(
        self, output_data: list[dict[str, Any]], log: list[Any], part_idx: int
    ):
        part_suffix = f"_part_{part_idx + 1}" if len(self.parts) > 1 else ""
        result_path = (
            Path(self.config.BASE_OUTPUT_DIR)
            / f"{Path(self.output_data_path).stem}{part_suffix}.json"
        )
        with open(result_path, "w", encoding="utf-8") as f:
            json.dump(output_data, f, ensure_ascii=False, indent=4)
        if log:
            log_path = (
                Path(self.config.BASE_OUTPUT_DIR)
                / f"{Path(self.output_data_path).stem}{part_suffix}_log.json"
            )
            with open(log_path, "w", encoding="utf-8") as f:
                json.dump(log, f, ensure_ascii=False, indent=4)

## 7. Running the Job
Now we can start the batch job.  
You’ll be prompted for:
- System prompt  
- Job name  
- Input JSON path  
- Output JSON path  

In [None]:
def main():
    print("=== Batch Job Runner ===")
    system_prompt = input("Enter system prompt: ").strip()
    job_name = input("Enter job name: ").strip()
    input_data_path = input("Enter input data path (JSON): ").strip()
    output_data_path = input("Enter output data path (JSON): ").strip()
    runner = BatchJobRunner(system_prompt, job_name, input_data_path, output_data_path)
    runner.run()


main()