## Introduction: Scaling Up with Batch Processing 🚀

In this notebook, we'll explore how to handle large-scale text extraction using batch processing on the Kluster platform.  Instead of processing articles one at a time, we'll send a whole batch of requests, making things much more efficient. This is perfect when you have hundreds or even thousands of articles to analyze.

## Setup and Configuration 🛠️

Let's get our environment ready. We'll install necessary packages, set up logging, and define the core components.

In [None]:
# Install necessary packages (uncomment if needed)
# !pip install openai pandas python-dotenv

import json
import logging  # For detailed logging
from pathlib import Path  # For file path handling
from openai import OpenAI  # OpenAI client for Kluster API
from datetime import datetime  # For timestamping
import os  # For environment variables
from dotenv import load_dotenv  # To load environment variables
import time  # For pausing execution
import pandas as pd  # For DataFrame manipulation

We need to handle progress bars differently depending on whether we're in a Jupyter Notebook or a regular script.

In [None]:
# Conditional import for tqdm based on environment
try:
    from tqdm.notebook import tqdm  # Nicer progress bars in Jupyter
    from IPython.display import clear_output, display  # For displaying DataFrames
    IN_NOTEBOOK = True
except ImportError:
    from tqdm import tqdm  # Standard progress bars
    IN_NOTEBOOK = False

Load environment variables and set up logging.  We'll log events to a file, which is crucial for debugging and monitoring batch jobs.

In [None]:
# Load environment variables
load_dotenv()

# Set up logging to file
logging.basicConfig(
    filename=f'article_extraction_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log', # Timestamped log file
    level=logging.INFO,  # Log INFO and higher level events
    format='%(asctime)s - %(levelname)s - %(message)s' # Log format
)
logger = logging.getLogger(__name__) # Get logger instance

## Defining the Extraction Schema 🏗️

Just like in the previous tutorial, we'll use Pydantic to define the structure of the data we want to extract.

In [None]:
# Define the extraction schema
from pydantic import BaseModel, Field # Import pydantic classes
from typing import List, Optional # For type hinting

class ExtractScheme(BaseModel):
    real_article: str = Field(description="Real article or scraping problem/artifact/copyright issue? - Select YES/NO only.")
    main_event: str = Field(description="Primary event or topic discussed in the article")
    event_summary: str = Field(description="A brief summary of the event or article's main points")
    entities_involved: List[str] = Field(description="Organizations, countries, or key entities involved in the event")
    key_people: List[str] = Field(description="Key people or figures mentioned in relation to the event")
    relevant_locations: Optional[List[str]] = Field(description="Locations that are central to the event, if any")
    key_developments: Optional[List[str]] = Field(description="Key developments or actions that have occurred or are expected")
    potential_impact: Optional[List[str]] = Field(description="Potential impacts or consequences of the event")
    keywords: List[str] = Field(description="Key terms or phrases that are central to the article")

## Core Functions ⚙️

Now, let's define the key functions that will handle loading articles, creating inference requests, processing results, and interacting with the Kluster API.

### Loading Articles

This function loads articles from a JSONL file.

In [None]:
def load_articles_from_jsonl(file_path):
    """Load articles from a JSONL file."""
    articles = []
    try:
        with open(file_path, 'r', encoding='utf-8') as f: # Open file for reading in UTF-8
            for line in f: # Read line by line
                try:
                    article_json = json.loads(line.strip()) # Load each line as JSON
                    articles.append(article_json) # Add to articles list
                except json.JSONDecodeError as e:
                    logger.warning(f"Skipping invalid JSON line: {e}") # Log invalid JSON
                    continue
        logger.info(f"Loaded {len(articles)} articles from {file_path}") # Log loading info
        return articles
    except Exception as e:
        logger.error(f"Error loading articles: {str(e)}") # Log errors
        raise

### Creating Inference Requests

This function takes the loaded articles and transforms them into the format required for the Kluster batch API.

In [None]:
def create_inference_file(articles_data, limit=100):
    """Create inference requests for batch processing."""
    try:
        json_schema = str(ExtractScheme.model_json_schema()) # Generate JSON schema string
        inference_list = []

        # Limit to specified number of articles
        subset_articles = articles_data[:min(limit, len(articles_data))] # Limit number of articles

        for i, article in enumerate(subset_articles):
            article_text = article['text'] # Extract text
            original_title = article.get('title', 'No title') # Extract title (with default)
            original_date = article.get('date', 'No date') # Extract date (with default)

            request = {
                "custom_id": f"article_extraction-{i}",  # Unique ID for each request
                "method": "POST", # HTTP method
                "url": "/v1/chat/completions", # Kluster API endpoint
                "body": {
                    "model": "klusterai/Meta-Llama-3.3-70B-Instruct-Turbo", # Specify model - note the updated model name.
                    "temperature": 0.2, # Control randomness of output
                    "messages": [
                        {
                            "role": "system",
                            "content": "You are an AI model tasked with extracting structured information from a news article. Follow the schema provided below to extract the relevant details. You do not invent information that is not in the provided text. You output JSON only in English. Nothing else."
                        },
                        {
                            "role": "user",
                            "content": f"Extract article information from the following text and output in English JSON format: {article_text} Use following JSON schema:" + json_schema
                        }
                    ],
                    "response_format": {"type": "json_object", "schema": ExtractScheme.model_json_schema()} # Enforce JSON output
                },
                "metadata": { # Add metadata for later reference
                    "original_title": original_title,
                    "original_date": original_date
                }
            }
            inference_list.append(request) # Add to list

        logger.info(f"Created {len(inference_list)} extraction requests") # Log number of requests
        return inference_list
    except Exception as e:
        logger.error(f"Error creating inference requests: {str(e)}") # Log errors
        raise

### Saving Inference Requests

This function saves the generated inference requests to a JSONL file, ready for upload to Kluster.

In [None]:
def save_inference_file(inference_list):
    """Save inference requests to a JSONL file."""
    try:
        filename = f"article_extraction_requests_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jsonl" # Timestamped filename
        with open(filename, 'w') as file: # Open file for writing
            for request in inference_list:
                file.write(json.dumps(request) + '\n') # Write each request as JSON line

        logger.info(f"Saved inference requests to {filename}") # Log file saving
        return filename
    except Exception as e:
        logger.error(f"Error saving inference file: {str(e)}") # Log errors
        raise

### Parsing JSON Objects

This function handles the parsing of JSON objects from the downloaded results, correctly handling potential errors.

In [None]:
def parse_json_objects(data_string):
    """Parse JSON objects from string data."""
    try:
        if isinstance(data_string, bytes): # Decode if bytes
            data_string = data_string.decode('utf-8')

        json_strings = data_string.strip().split('\n') # Split into individual JSON strings
        json_objects = []

        for json_str in json_strings: # Loop through
            try:
                json_obj = json.loads(json_str) # Parse JSON
                json_objects.append(json_obj) # Add to list
            except json.JSONDecodeError as e:
                logger.error(f"Error parsing JSON: {e}") # Log parsing errors
                print(f"Error parsing JSON: {e}") # Print parsing errors

        return json_objects
    except Exception as e:
        logger.error(f"Error in parse_json_objects: {str(e)}") # Log errors
        raise

### Flattening List Columns

This function flattens list columns in the DataFrame for easier viewing and CSV export.

In [None]:
def flatten_list_columns(df):
    """Flatten list columns to comma-separated strings."""
    try:
        flattened_df = df.copy() # Copy to avoid modifying original DataFrame
        list_columns = [col for col in df.columns if df[col].apply(lambda x: isinstance(x, list)).any()] # Find list columns

        for col in list_columns:
            flattened_df[col] = flattened_df[col].apply(
                lambda x: ', '.join(x) if isinstance(x, list) and x else '') # Join list elements

        return flattened_df
    except Exception as e:
        logger.error(f"Error flattening list columns: {str(e)}") # Log errors
        raise

### Waiting for Job Completion

This function monitors the status of a batch job on Kluster, providing a progress indicator and handling completion or failure.

In [None]:
def wait_for_job_completion(client, job_id, check_interval=10):
    """Wait for batch job to complete with progress indicator."""
    try:
        print("Waiting for batch job to complete...")
        spinner = tqdm(desc="Processing", unit="checks") # Initialize progress bar

        while True:
            job_status = client.batches.retrieve(job_id) # Get job status
            spinner.set_description(f"Status: {job_status.status}") # Update progress bar

            if job_status.status == "completed":
                spinner.close() # Close progress bar
                print(f"Job completed! Output file ID: {job_status.output_file_id}")
                return job_status
            elif job_status.status == "failed":
                spinner.close() # Close progress bar
                error_msg = f"Job failed with status: {job_status.status}" # Construct error message
                logger.error(error_msg) # Log error
                raise Exception(error_msg) # Raise exception

            spinner.update(1) # Update progress bar
            time.sleep(check_interval) # Wait for next check
    except Exception as e:
        logger.error(f"Error waiting for job completion: {str(e)}") # Log errors
        raise

### Processing Results

This function downloads the results of a completed batch job, parses the JSON output, extracts the relevant data, and saves it to a CSV file.

In [None]:
def process_results(client, job_status):
    """Process the results from a completed batch job."""
    try:
        # Download results
        output_file = client.files.retrieve(job_status.output_file_id) # Get output file
        download_path = f"results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jsonl" # Timestamped filename
        with open(download_path, 'wb') as f:  # Open file for writing binary data
            f.write(client.files.content(output_file.id).read()) # Download file content

        print(f"Results downloaded to {download_path}") # Print download location
        logger.info(f"Results downloaded to {download_path}") # Log download location

        # Parse results
        with open(download_path, 'r') as f: # Open downloaded file
            results_data = f.read() # Read file content

        results = parse_json_objects(results_data) # Parse JSON objects

        # Extract and process the data
        processed_data = []
        for result in results:
            try:
                # Extract the custom_id to match back to original data
                custom_id = result.get("custom_id", "") # Get custom ID

                # Get the completion content
                completion = result.get("response", {}).get("choices", [{}])[0].get("message", {}).get("content", "{}") # Extract AI response

                # Parse the completion JSON
                try:
                    completion_data = json.loads(completion) # Parse JSON response

                    # Add metadata from the original request
                    metadata = result.get("metadata", {}) # Get metadata
                    completion_data.update(metadata) # Add metadata to extracted data

                    processed_data.append(completion_data) # Append to processed data
                except json.JSONDecodeError:
                    logger.warning(f"Could not parse completion JSON for {custom_id}") # Log parsing errors
            except Exception as e:
                logger.warning(f"Error processing result: {str(e)}") # Log errors

        # Convert to DataFrame and save
        if processed_data:
            results_df = pd.DataFrame(processed_data) # Create DataFrame
            results_df = flatten_list_columns(results_df) # Flatten list columns

            csv_path = f"extracted_articles_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" # Timestamped CSV path
            results_df.to_csv(csv_path, index=False) # Save to CSV

            print(f"Processed {len(results_df)} articles and saved to {csv_path}") # Print summary
            logger.info(f"Processed {len(results_df)} articles and saved to {csv_path}") # Log summary

            return results_df
        else:
            logger.warning("No valid results to process") # Log no results
            print("No valid results to process")
            return None
    except Exception as e:
        logger.error(f"Error processing results: {str(e)}") # Log errors
        raise

## Main Execution Function 🎬

This function orchestrates the entire process: loading articles, creating inference requests, submitting a batch job to Kluster, waiting for completion, and processing the results.

In [None]:
!wget https://rjuro.com/unistra-nlp2025/data/paraphrased_articles.jsonl

In [None]:
def main():
    """Main function to run the article extraction process."""
    try:
        # Configuration
        jsonl_file_path = 'paraphrased_articles.jsonl'  # Path to input data
        api_key = os.getenv('KLUSTER_API_KEY')  # Get API key from environment
        
        if not api_key:
            raise ValueError("KLUSTER_API_KEY not found in environment variables")
        
        logger.info("Starting article extraction process")  # Log start
        print("Starting article extraction process") # Print start
        
        # Initialize Kluster client
        client = OpenAI(
            base_url="https://api.kluster.ai/v1", # Kluster API base URL
            api_key=api_key, # API key
        )
        
        # Load articles
        articles_data = load_articles_from_jsonl(jsonl_file_path) # Load articles
        
        # Create and save inference requests
        inference_list = create_inference_file(articles_data, limit=100) # Create requests, limit to 100
        filename = save_inference_file(inference_list) # Save requests to file
        
        # Preview the request file
        print("\nPreview of request file:")  # Print preview message
        with open(filename, 'r') as f: # Open file for reading
            print(f.readline())  # Print first line

        # Upload the file to Kluster
        print("\nUploading inference file to Kluster...")  # Print upload message
        inference_input_file = client.files.create( # Upload file
            file=open(filename, "rb"), # Open file for binary reading
            purpose="batch" # Specify file purpose
        )
        print(f"File uploaded with ID: {inference_input_file.id}") # Print file ID
        
        # Start the batch job
        print("\nStarting batch job...")  # Print start message
        inference_job = client.batches.create( # Create batch job
            input_file_id=inference_input_file.id, # Input file ID
            endpoint="/v1/chat/completions", # API endpoint
            completion_window="24h" # Completion window
        )
        print(f"Batch job created with ID: {inference_job.id}")  # Print job ID
        
        # Wait for job completion
        job_status = wait_for_job_completion(client, inference_job.id) # Wait for completion
        
        # Process and save results
        results_df = process_results(client, job_status) # Process results
        
        logger.info("Article extraction process completed successfully") # Log completion
        print("\nArticle extraction process completed successfully") # Print completion
        
        # Display sample in notebook if applicable
        if IN_NOTEBOOK and results_df is not None: # Check if in notebook and results exist
            display(results_df.head()) # Display first few rows of DataFrame
        
        return results_df # Return DataFrame
        
    except Exception as e:
        logger.error(f"Script failed: {str(e)}") # Log script failure
        print(f"Error: {str(e)}")  # Print error message
        raise

if __name__ == "__main__": # Run main function if script is executed
    main()

## Running the Script and Conclusion 🏁

To run the script, make sure you have your `KLUSTER_API_KEY` set in your environment variables and a `paraphrased_articles.jsonl` file with your article data.  Then, simply execute the Python file.

This batch processing approach allows you to efficiently extract structured information from a large number of articles, leveraging the power of Kluster's API. Remember to monitor the logs for any issues and to adjust parameters like `model` and `temperature` as needed to optimize your results. You have now scaled up your article processing considerably.