# Cohere Batch API Cookbook: Embeddings

This notebook demonstrates how to use the Cohere Batch API to process large volumes of embedding requests efficiently.

## What You'll Learn

1. How to prepare input data in the correct format for batch embedding
2. How to upload datasets and create batch jobs
3. How to monitor job progress
4. How to download and process results
5. How to manage and monitor your datasets and batch jobs

## Use Cases

- Embedding large document collections for semantic search
- Creating embeddings for machine learning training data
- Batch processing of text data for clustering or classification
- Building vector databases with thousands or millions of entries

## Setup and Installation

In [3]:
# Install required packages
%%capture
!pip install fastavro requests cohere==5.19.0 -q

In [4]:
import cohere
import json
import time
import requests
import fastavro
import os
from typing import List, Dict

In [5]:
# Initialize Cohere client
# Replace with your actual API key from https://dashboard.cohere.com/api-keys
API_KEY = "your-api-key"

# V1 client for datasets
co = cohere.Client(api_key=API_KEY)

print("✓ Cohere client initialized")

✓ Cohere client initialized


## Step 1: Prepare Your Input Data

The Batch API requires input data in JSONL format, where each line is a JSON object containing:
- `custom_id`: A **unique** identifier for tracking the request
- `body`: The embedding request parameters with the following structure:
  - `input_type`: Type of input (search_document, search_query, classification, clustering)
  - `embedding_types`: Array of embedding formats to return
  - `inputs`: Array of input objects, each with a `content` array containing text objects

In [6]:
# Example: Sample dataset of product descriptions
sample_texts = [
    "High-performance laptop with 16GB RAM and 512GB SSD",
    "Wireless noise-cancelling headphones with 30-hour battery life",
    "Ergonomic office chair with lumbar support and adjustable height",
    "4K smart TV with HDR and built-in streaming apps",
    "Professional camera with 24MP sensor and 4K video recording",
    "Fitness tracker with heart rate monitor and GPS",
    "Portable Bluetooth speaker with waterproof design",
    "Gaming mouse with programmable buttons and RGB lighting",
    "Standing desk converter with dual monitor support",
    "Mechanical keyboard with tactile switches and backlighting"
]

print(f"Sample dataset contains {len(sample_texts)} items")
print(f"First item: {sample_texts[0]}")

Sample dataset contains 10 items
First item: High-performance laptop with 16GB RAM and 512GB SSD


In [7]:
def create_batch_input(texts: List[str], input_type: str = "search_document") -> List[Dict]:
    """
    Create batch input in the required JSONL format for Cohere Embed v2.

    Args:
        texts: List of texts to embed
        input_type: One of 'search_document', 'search_query', 'classification', 'clustering'

    Returns:
        List of dictionaries ready to be saved as JSONL
    """
    batch_input = []

    for idx, text in enumerate(texts):
        # custom_id must be unique - using string index
        batch_input.append({
            "custom_id": str(idx + 1),  # Ensure unique IDs starting from "1"
            "body": {
                "input_type": input_type,
                "embedding_types": ["float"],
                "inputs": [
                    {
                        "content": [
                            {
                                "type": "text",
                                "text": text
                            }
                        ]
                    }
                ]
            }
        })

    return batch_input

# Create batch input
batch_input = create_batch_input(sample_texts, input_type="search_document")

print(f"Created {len(batch_input)} batch requests")
print("\nExample request:")
print(json.dumps(batch_input[0], indent=2))

Created 10 batch requests

Example request:
{
  "custom_id": "1",
  "body": {
    "input_type": "search_document",
    "embedding_types": [
      "float"
    ],
    "inputs": [
      {
        "content": [
          {
            "type": "text",
            "text": "High-performance laptop with 16GB RAM and 512GB SSD"
          }
        ]
      }
    ]
  }
}


In [8]:
def save_jsonl(data: List[Dict], filename: str):
    """Save data as JSONL file (one JSON object per line)."""
    with open(filename, "w") as f:
        for item in data:
            f.write(json.dumps(item) + "\n")
    print(f"✓ Saved {len(data)} records to {filename}")

# Save to file
input_filename = "batch_embed_input.jsonl"
save_jsonl(batch_input, input_filename)

✓ Saved 10 records to batch_embed_input.jsonl


## Step 2: Upload Dataset to Cohere

Upload your JSONL file to Cohere's dataset service. The dataset will be validated asynchronously.

In [9]:
# Upload dataset and wait for validation
print("Uploading dataset...")

dataset = co.datasets.create(
    name="product-embeddings-batch",
    data=open(input_filename, "rb"),
    type="batch-embed-v2-input"
)

print(f"Dataset created with ID: {dataset.id}")
print("Waiting for dataset validation to complete...")

try:
    dataset_response = co.wait(dataset)
    dataset = dataset_response.dataset
    print(f"✓ Dataset validated successfully! ID: {dataset.id}")
except Exception as e:
    print(f"✗ Dataset validation failed: {e}")
    raise Exception(f"Dataset validation failed: {e}")

Uploading dataset...
Dataset created with ID: product-embeddings-batch-gszxbq
Waiting for dataset validation to complete...
...
...
...
✓ Dataset validated successfully! ID: product-embeddings-batch-gszxbq


## Step 3: Create Batch Job

Create a batch embedding job using your validated dataset.

In [10]:
# Create batch job
print("Creating batch job...")
batch_job = co.batches.create(
    request={
        "name": "product-embeddings-job",
        "input_dataset_id": dataset.id,
        "model": "batch-embed-v4"  # Options: batch-embed-v4, batch-embed-v4-a100
    }
)

print(f"✓ Batch job created with ID: {batch_job.batch.id}")
print(f"  Name: {batch_job.batch.name}")
print(f"  Model: {batch_job.batch.model}")
print(f"  Status: {batch_job.batch.status}")
print(f"  Number of records: {batch_job.batch.num_records}")
print(f"  Created at: {batch_job.batch.created_at}")
print(f"\nSave this ID to check status later: {batch_job.batch.id}")

Creating batch job...
✓ Batch job created with ID: 879296b8-8c2d-45ac-8cec-02436e859611
  Name: product-embeddings-job
  Model: batch-embed-v4
  Status: BATCH_STATUS_QUEUED
  Number of records: 10
  Created at: 2025-10-16 17:36:15.297709+00:00

Save this ID to check status later: 879296b8-8c2d-45ac-8cec-02436e859611


## Step 4: Monitor Job Progress

Poll the batch job status until it completes. This may take anywhere from a few minutes to several hours depending on the size of the batch job.

In [11]:
def monitor_batch_job(co, batch_id: str, poll_interval: int = 30):
    """
    Monitor batch job until completion.
    Args:
        co: Cohere client
        batch_id: Batch job ID
        poll_interval: Seconds between status checks
    Returns:
        Final batch job status
    """
    print("Monitoring batch job progress...")
    print("(This may take a while for large batches or if there are in-progress jobs)\n")

    while True:
        response = co.batches.retrieve(batch_id)
        status = response.batch

        # Display progress
        print(f"Status: {status.status}")
        print(f"Progress: {status.num_successful_records}/{status.num_records} completed")

        if status.num_failed_records > 0:
            print(f"Failed: {status.num_failed_records}")

        # Check if job is complete
        if status.status in ["BATCH_STATUS_COMPLETED", "BATCH_STATUS_FAILED", "BATCH_STATUS_CANCELLED"]:
            print(f"\n✓ Job {status.status}")
            return status

        print(f"Checking again in {poll_interval} seconds...\n")
        time.sleep(poll_interval)

# Monitor the job
final_status = monitor_batch_job(co, batch_job.batch.id, poll_interval=60)

Monitoring batch job progress...
(This may take a while for large batches)

Status: BATCH_STATUS_QUEUED
Progress: 0/10 completed
Checking again in 60 seconds...

Status: BATCH_STATUS_QUEUED
Progress: 0/10 completed
Checking again in 60 seconds...

Status: BATCH_STATUS_QUEUED
Progress: 0/10 completed
Checking again in 60 seconds...

Status: BATCH_STATUS_QUEUED
Progress: 0/10 completed
Checking again in 60 seconds...

Status: BATCH_STATUS_QUEUED
Progress: 0/10 completed
Checking again in 60 seconds...

Status: BATCH_STATUS_QUEUED
Progress: 0/10 completed
Checking again in 60 seconds...

Status: BATCH_STATUS_QUEUED
Progress: 0/10 completed
Checking again in 60 seconds...

Status: BATCH_STATUS_COMPLETED
Progress: 10/10 completed

✓ Job BATCH_STATUS_COMPLETED


In [12]:
# Display final results summary
print("\n" + "="*50)
print("BATCH JOB SUMMARY")
print("="*50)
print(f"Job ID: {final_status.id}")
print(f"Status: {final_status.status}")
print(f"Total requests: {final_status.num_records}")
print(f"Completed: {final_status.num_successful_records}")
print(f"Failed: {final_status.num_failed_records}")
print(f"Output dataset ID: {final_status.output_dataset_id}")
print("="*50)


BATCH JOB SUMMARY
Job ID: 879296b8-8c2d-45ac-8cec-02436e859611
Status: BATCH_STATUS_COMPLETED
Total requests: 10
Completed: 10
Failed: 0
Output dataset ID: product-embeddings-job-output-xse5ev


## Step 5: Download and Process Results

Download the output dataset and save it in your preferred format using Cohere's built-in utility.
The `co.utils.save_dataset()` function supports multiple formats:

* `jsonl` - JSON Lines format (recommended for preserving data structure)
* `csv` - Comma-separated values (easy to view in Excel/Sheets)
* `avro` - Apache Avro format (efficient binary format)

Simply change the format parameter to your desired output format. The function handles all the downloading and conversion automatically.

In [23]:
def download_and_save_dataset(dataset_id, output_dir, co, format="jsonl"):
    """
    Download dataset from Cohere and save to file.

    Args:
        dataset_id: Output dataset ID from batch job
        output_dir: Directory to save output files
        co: Cohere client
        format: Output format - 'jsonl', 'csv', or 'avro' (default: 'jsonl')

    Returns:
        List of result dictionaries
    """
    print(f"Downloading dataset {dataset_id}...")

    # Get the dataset object
    output_dataset = co.datasets.get(id=dataset_id).dataset

    # Collect all records
    dataset = []
    for record in output_dataset:
        dataset.append(record)

    print(f"✓ Downloaded {len(dataset)} records")

    # Save using Cohere's utility
    output_path = os.path.join(output_dir, f"{dataset_id}.{format}")
    co.utils.save_dataset(
        dataset=output_dataset,
        filepath=output_path,
        format=format,
    )

    print(f"✓ Saved {len(dataset)} records to {output_path}")

    return dataset


# Create output directory
output_dir = "./batch_output"
os.makedirs(output_dir, exist_ok=True)

# Download and save results
results = download_and_save_dataset(
    final_status.output_dataset_id,
    output_dir,
    co,
    format="jsonl"  # Options: 'jsonl', 'csv', or 'avro'
)

print(f"\n✓ Results saved to {output_dir}/{final_status.output_dataset_id}.jsonl")

Downloading dataset product-embeddings-job-output-xse5ev...
✓ Downloaded 12 records
✓ Saved 12 records to ./batch_output/product-embeddings-job-output-xse5ev.jsonl

✓ Results saved to ./batch_output/product-embeddings-job-output-xse5ev.jsonl


In [20]:
# Inspect first result
print("Example result structure:")
print(json.dumps(results[0], indent=2, default=str)[:500] + "...")

Example result structure:
{
  "custom_id": "4",
  "body": {
    "id": "5f10573b-c3dc-40e8-bd6c-9e106889de04",
    "embeddings": {
      "float": [
        [
          -0.09744025,
          -0.02665485,
          -0.02577224,
          -0.0074580624,
          0.025948763,
          0.017210914,
          -0.024713106,
          0.0061341464,
          -0.034774873,
          -0.0060458854,
          -0.0014121776,
          0.124271624,
          -0.049426213,
          -0.0044571855,
          -0.004744034,
          0...


## Cleanup (Optional)

Clean up temporary files created during this tutorial.

In [None]:
# Remove temporary files
files_to_remove = ["batch_embed_input.jsonl"]

for file in files_to_remove:
    if os.path.exists(file):
        os.remove(file)
        print(f"Removed {file}")

Removed batch_embed_input.jsonl


## Managing Your Datasets and Batch Jobs

Now that you've completed a full batch job workflow, let's explore how to manage your datasets and batch jobs. These utilities are helpful for monitoring multiple jobs, debugging issues, or cleaning up resources.

### Listing All Datasets

You can view all datasets in your account to track what data you've uploaded.


In [None]:
# List all datasets
print("Fetching your datasets...\n")

# You can filter to only show validated datasets
only_validated = True  # Set to False to see all datasets

datasets_response = co.datasets.list(
    validation_status="validated" if only_validated else None
)

print("=" * 80)
print(f"YOUR DATASETS {'(Validated Only)' if only_validated else '(All)'}")
print("=" * 80)

if not datasets_response.datasets:
    print("\nNo datasets found.")
else:
    for i, ds in enumerate(datasets_response.datasets[:10], 1):  # Show first 10
        print(f"\n#{i}")
        print(f"  ID: {ds.id}")
        print(f"  Name: {ds.name}")
        print(f"  Type: {ds.dataset_type}")
        print(f"  Validation Status: {ds.validation_status}")
        print(f"  Created: {ds.created_at}")
        if hasattr(ds, 'validation_error') and ds.validation_error:
            print(f"  Validation Error: {ds.validation_error}")

    total = len(datasets_response.datasets)
    print(f"\n(Showing {min(10, total)} of {total} datasets)")


Fetching your datasets...

YOUR DATASETS (Validated Only)

#1
  ID: product-embeddings-job-output-5d4qyd
  Name: product-embeddings-job-output
  Type: batch-embed-v2-output
  Validation Status: validated
  Created: 2025-10-16 00:54:03.625755+00:00

#2
  ID: product-embeddings-job-output-ay6j8w
  Name: product-embeddings-job-output
  Type: batch-embed-v2-output
  Validation Status: validated
  Created: 2025-10-16 00:53:43.777087+00:00

#3
  ID: product-embeddings-job-output-qajjgg
  Name: product-embeddings-job-output
  Type: batch-embed-v2-output
  Validation Status: validated
  Created: 2025-10-16 00:53:18.973449+00:00

#4
  ID: product-embeddings-batch-sv80yd
  Name: product-embeddings-batch
  Type: batch-embed-v2-input
  Validation Status: validated
  Created: 2025-10-16 00:48:32.344911+00:00

#5
  ID: product-embeddings-batch-qvtetd
  Name: product-embeddings-batch
  Type: batch-embed-v2-input
  Validation Status: validated
  Created: 2025-10-16 00:44:15.853256+00:00

#6
  ID: prod

### Listing All Batch Jobs

View all batch jobs to monitor their progress and status.

In [None]:
# List all batch jobs
print("\nFetching your batch jobs...\n")

batches_response = co.batches.list()

print("=" * 80)
print("YOUR BATCH JOBS")
print("=" * 80)

if not batches_response.batches:
    print("\nNo batch jobs found.")
else:
    for i, batch in enumerate(batches_response.batches[:10], 1):  # Show first 10
        print(f"\n#{i}")
        print(f"  ID: {batch.id}")
        print(f"  Name: {batch.name}")
        print(f"  Model: {batch.model}")
        print(f"  Status: {batch.status}")
        print(f"  Progress: {batch.num_successful_records}/{batch.num_records} completed")

        if batch.num_failed_records > 0:
            print(f"  Failed: {batch.num_failed_records}")

        print(f"  Input Dataset: {batch.input_dataset_id}")
        if batch.output_dataset_id:
            print(f"  Output Dataset: {batch.output_dataset_id}")

        print(f"  Created: {batch.created_at}")
        print(f"  Updated: {batch.updated_at}")

    total = len(batches_response.batches)
    print(f"\n(Showing {min(10, total)} of {total} batch jobs)")


Fetching your batch jobs...

YOUR BATCH JOBS

#1
  ID: a17e1e5f-e80e-471b-ada1-2750b47b432c
  Name: rit-embed-job-mixed-30k
  Model: batch-embed-v4
  Status: BATCH_STATUS_COMPLETED
  Progress: 29248/30000 completed
  Input Dataset: 
  Created: 2025-08-15 01:18:30.237654+00:00
  Updated: 2025-08-15 02:47:23.386329+00:00

#2
  ID: 7cc1a188-73a1-4709-84c0-66f6274482f7
  Name: product-embeddings-job
  Model: batch-embed-v4
  Status: BATCH_STATUS_COMPLETED
  Progress: 10/10 completed
  Input Dataset: product-embeddings-batch-fk0pfx
  Output Dataset: product-embeddings-job-output-ezn4ke
  Created: 2025-10-10 01:43:48.240205+00:00
  Updated: 2025-10-10 01:43:53.196834+00:00

#3
  ID: 9c922fd7-ccda-4dcb-b6f8-bdf9a9d4be86
  Name: product-embeddings-job
  Model: batch-embed-v4
  Status: BATCH_STATUS_COMPLETED
  Progress: 10/10 completed
  Input Dataset: product-embeddings-batch-fk0pfx
  Output Dataset: product-embeddings-job-output-bkxwq4
  Created: 2025-10-10 01:44:21.044806+00:00
  Updated: 2

## Cancelling a Batch Job

If you need to stop a running or queued batch job, you can cancel it. This is useful if you've submitted a job with incorrect parameters or if you no longer need the results.

**Important Notes:**
- You can only cancel jobs with status `BATCH_STATUS_QUEUED` or `BATCH_STATUS_IN_PROGRESS`
- Completed, failed, or already cancelled jobs cannot be cancelled
- Cancellation is immediate and cannot be undone

In [None]:

# Example: Checking if a batch job can be cancelled
print("Checking if batch job can be cancelled...\n")

# Get current status
response = co.batches.retrieve(batch_job.batch.id)
current_status = response.batch.status

print(f"Batch ID: {batch_job.batch.id}")
print(f"Batch Name: {response.batch.name}")
print(f"Current Status: {current_status}")
print(f"Progress: {response.batch.num_successful_records}/{response.batch.num_records}")

# Define cancellable statuses
cancellable_statuses = ["BATCH_STATUS_QUEUED", "BATCH_STATUS_IN_PROGRESS"]
non_cancellable_statuses = [
    "BATCH_STATUS_COMPLETED",
    "BATCH_STATUS_FAILED",
    "BATCH_STATUS_CANCELLED",
    "BATCH_STATUS_CANCELING"
]

# Check if cancellation is possible
if current_status in cancellable_statuses:
    print(f"\n✓ This job can be cancelled (status: {current_status})")
    print("\nTo cancel this job, uncomment and run:")
    print(f"# co.batches.cancel('{batch_job.batch.id}')")
    print(f"# print('✓ Batch job cancelled')")
elif current_status in non_cancellable_statuses:
    print(f"\n⚠️ Cannot cancel - job status is: {current_status}")
    if current_status == "BATCH_STATUS_CANCELING":
        print("This job is already being cancelled.")
    elif current_status == "BATCH_STATUS_CANCELLED":
        print("This job has already been cancelled.")
    else:
        print(f"Only jobs with status in {cancellable_statuses} can be cancelled.")
else:
    print(f"\n⚠️ Unknown status: {current_status}")

Checking if batch job can be cancelled...

Batch ID: 8b58636e-7638-4710-940b-541fc85a2cf0
Batch Name: product-embeddings-job
Current Status: BATCH_STATUS_COMPLETED
Progress: 10/10

⚠️ Cannot cancel - job status is: BATCH_STATUS_COMPLETED
Only jobs with status in ['BATCH_STATUS_QUEUED', 'BATCH_STATUS_IN_PROGRESS'] can be cancelled.


## Summary

In this cookbook, you learned how to:

1. ✅ Prepare input data in the correct JSONL format with unique `custom_id`s
2. ✅ Upload datasets to Cohere and wait for validation
3. ✅ Create and monitor batch embedding jobs
4. ✅ Download and convert results from AVRO to JSONL format
5. ✅ List and inspect your datasets and batch jobs
6. ✅ Cancel batch jobs when needed

## Resources

- [Batch API reference](https://docs.cohere.com/reference/create-batch)
- [Dataset API Reference](https://docs.cohere.com/docs/datasets)
- [Embed v2 API Reference](https://docs.cohere.com/reference/embed)
- [Get API Key](https://dashboard.cohere.com/api-keys)