# Create Batch Job for Gemini API

This notebook creates a batch job for processing multiple prompts through the Google Gemini API.

## Features
- Supports Gemini 2.5 (Pro, Flash) and Gemini 3 models
- Configurable thinking mode:
  - Gemini 2.5: `thinkingBudget` (-1 for dynamic, or specific token count)
  - Gemini 3: `thinkingLevel` ("low" or "high")
- Automatic OpenAI-to-Gemini message format conversion
- Batch metadata saved for result retrieval

## Prerequisites
- Set the `GEMINI_API_KEY` environment variable
- Prepare input JSON file with `keys` and `contexts` arrays

## Input Format
```json
{
  "keys": ["case_001", "case_002", ...],
  "contexts": [
    [{"role": "user", "content": "..."}],
    [{"role": "system", "content": "..."}, {"role": "user", "content": "..."}],
    ...
  ]
}
```

## Output
- JSONL file uploaded to Gemini File API
- Batch metadata saved to `logs/{INPUT_DIR}/{MODEL_NAME}/{INPUT_FILE}.json`
- Use `retrieve_batch_results_gemini.ipynb` to get results after batch completes

In [None]:
# ============================================================================
# CONFIGURATION - Edit these variables before running
# ============================================================================
from pathlib import Path

# Base directory for data files
BASE_DIR = Path(".")  # Change to your data directory

# Input file settings
INPUT_DIR = "data"  # Directory containing input JSON file
INPUT_FILE = "your_input_file"  # Name without .json extension

# Model settings
# Supported Gemini 2.5: gemini-2.5-pro, gemini-2.5-flash
# Supported Gemini 3: gemini-3-pro-preview
MODEL_NAME = "gemini-2.5-pro"

# Thinking configuration
# For Gemini 2.5: Set THINKING_BUDGET (-1 for dynamic, or specific token count)
# For Gemini 3: Set THINKING_LEVEL ("low" or "high")
THINKING_LEVEL = "high"   # Gemini 3 only
THINKING_BUDGET = -1      # Gemini 2.5 only (-1 = dynamic)

# Output directories
LOGS_DIR = BASE_DIR / "logs"
JSONL_DIR = BASE_DIR / "gemini_jsonl"

# ============================================================================

In [None]:
import json
import os
from google import genai
from google.genai import types

# Detect model version for thinking config
is_gemini_3 = "gemini-3" in MODEL_NAME

# Initialize Gemini client
client = genai.Client(api_key=os.environ.get("GEMINI_API_KEY"))

# Build paths
input_path = BASE_DIR / INPUT_DIR / f"{INPUT_FILE}.json"
jsonl_path = JSONL_DIR / INPUT_DIR / MODEL_NAME / f"{INPUT_FILE}.jsonl"
jsonl_path.parent.mkdir(parents=True, exist_ok=True)

# Load input data
with open(input_path, 'r') as f:
    data = json.load(f)

print(f"Input file: {input_path}")
print(f"Loaded {len(data['keys'])} cases")
print(f"Model: {MODEL_NAME}")
print(f"Gemini 3: {is_gemini_3}")
if is_gemini_3:
    print(f"Thinking level: {THINKING_LEVEL}")
else:
    print(f"Thinking budget: {THINKING_BUDGET}")

In [None]:
# Build JSONL batch requests
requests = []
key_dict = {}

for i, (key, context) in enumerate(list(zip(data["keys"], data["contexts"]))):
    # Handle system instruction
    system_instruction = None
    messages = context.copy()
    
    if messages and messages[0]["role"] == "system":
        system_content = messages[0]["content"]
        system_instruction = {
            "parts": [{"text": system_content}]
        }
        messages = messages[1:]  # Remove system from messages
    
    # Convert OpenAI-style messages to Gemini contents format
    contents = []
    for message in messages:
        # Map roles: user -> user, assistant -> model
        role = "user" if message["role"] == "user" else "model"
        contents.append({
            "role": role,
            "parts": [{"text": message["content"]}]
        })
    
    # Build generationConfig
    generation_config = {
        "temperature": 0.7,
        "topP": 1.0,
        "maxOutputTokens": 20000,
    }
    
    # Add thinking config based on model version
    if is_gemini_3:
        generation_config["thinkingConfig"] = {"thinkingLevel": THINKING_LEVEL}
    else:
        generation_config["thinkingConfig"] = {"thinkingBudget": THINKING_BUDGET}
    
    # Build batch request
    batch_request = {
        "key": str(i),
        "request": {
            "contents": contents,
            "generationConfig": generation_config
        }
    }
    
    # Add system instruction if exists
    if system_instruction:
        batch_request["request"]["systemInstruction"] = system_instruction
    
    key_dict[str(i)] = key
    requests.append(batch_request)

# Write to JSONL file
with open(jsonl_path, "w") as f:
    for req in requests:
        json.dump(req, f)
        f.write("\n")

print(f"Created {len(requests)} batch requests")
print(f"Saved to: {jsonl_path}")

In [None]:
# Upload JSONL file to Gemini File API
uploaded_file = client.files.upload(
    file=str(jsonl_path),
    config=types.UploadFileConfig(
        display_name=INPUT_FILE,
        mime_type='application/json'
    )
)

print(f"Uploaded file:")
print(f"  Name: {uploaded_file.name}")
print(f"  Display name: {uploaded_file.display_name}")
print(f"  URI: {uploaded_file.uri}")

In [None]:
# Create batch job
file_batch_job = client.batches.create(
    model=MODEL_NAME,
    src=uploaded_file.name,
    config={
        'display_name': f"{MODEL_NAME}_{INPUT_FILE}",
    }
)

print(f"Created batch job:")
print(f"  Name: {file_batch_job.name}")
print(f"  State: {file_batch_job.state.name}")
print(f"  Model: {file_batch_job.model}")
print(f"  Create time: {file_batch_job.create_time}")

In [None]:
# Check batch status (re-run this cell to check progress)
batch_job = client.batches.get(name=file_batch_job.name)

print(f"Batch job: {batch_job.name}")
print(f"State: {batch_job.state.name}")

# Possible states:
# JOB_STATE_PENDING, JOB_STATE_RUNNING, JOB_STATE_SUCCEEDED,
# JOB_STATE_FAILED, JOB_STATE_CANCELLED, JOB_STATE_EXPIRED

if batch_job.state.name == "JOB_STATE_SUCCEEDED":
    print(f"\nBatch completed!")
    if batch_job.dest and batch_job.dest.file_name:
        print(f"Results file: {batch_job.dest.file_name}")
    elif batch_job.dest and batch_job.dest.inlined_responses:
        print(f"Results are inline (count: {len(batch_job.dest.inlined_responses)})")
    else:
        print("Results location unknown")
elif batch_job.state.name in ["JOB_STATE_PENDING", "JOB_STATE_RUNNING"]:
    print(f"\nBatch still processing...")
elif batch_job.state.name == "JOB_STATE_FAILED":
    print(f"\nBatch failed!")
    if hasattr(batch_job, 'error') and batch_job.error:
        print(f"Error: {batch_job.error}")
elif batch_job.state.name == "JOB_STATE_CANCELLED":
    print(f"\nBatch was cancelled")
elif batch_job.state.name == "JOB_STATE_EXPIRED":
    print(f"\nBatch expired (exceeded 48-hour window)")

In [None]:
# Save batch metadata and key_dict for later result retrieval

# Convert batch job to dict for saving
try:
    batch_job_dict = batch_job.to_dict()
except AttributeError:
    batch_job_dict = {
        "name": batch_job.name,
        "model": batch_job.model,
        "state": batch_job.state.name,
        "create_time": str(batch_job.create_time),
    }
    if hasattr(batch_job, 'dest') and batch_job.dest:
        if hasattr(batch_job.dest, 'file_name') and batch_job.dest.file_name:
            batch_job_dict["dest_file_name"] = batch_job.dest.file_name

# Build log path
log_save_path = LOGS_DIR / INPUT_DIR / MODEL_NAME / f"{INPUT_FILE}.json"
log_save_path.parent.mkdir(parents=True, exist_ok=True)

# Save batch metadata and key mapping
saving_dict = {
    "batch_job": batch_job_dict,
    "key_dict": key_dict,
    "uploaded_file": {
        "name": uploaded_file.name,
        "uri": uploaded_file.uri
    }
}

with open(log_save_path, 'w') as f:
    json.dump(saving_dict, f, indent=4, default=str)

print(f"Batch metadata saved to: {log_save_path}")
print(f"\nBatch job name: {file_batch_job.name}")
print(f"\nTo check status later, use:")
print(f"  client.batches.get(name='{file_batch_job.name}')")
print(f"\nTo retrieve results when complete, use retrieve_batch_results_gemini.ipynb")