In [1]:
import csv
import json
import requests
import re
from pathlib import Path

In [2]:
# define model, ollama API, input files
model_name = "deepseek-r1:70b" # model name here
ollama_url = "http://localhost:11434/api/generate"

input_files = [
    "relevance_210725_prompts_templ-1.csv"
    #"relevance_210725_prompts_templ-2.csv",
    #"relevance_210725_prompts_templ-3.csv",
    #"relevance_210725_prompts_templ-4.csv",
    #"relevance_210725_prompts_templ-5.csv",
]

In [None]:
import time
import json
import requests
import re
from pathlib import Path
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, as_completed

# How many concurrent requests
MAX_WORKERS = 10

# Helper: call Ollama for a single prompt
def call_ollama_single(prompt: str) -> str:
    payload = {
        "model": model_name,
        "prompt": prompt,
        "stream": False
    }
    resp = requests.post(ollama_url,
                         headers={"Content-Type": "application/json"},
                         data=json.dumps(payload), timeout=60)
    if resp.status_code != 200:
        return f"Error: HTTP {resp.status_code}"
    try:
        result = resp.json()
    except ValueError:
        text = resp.text.strip()
    else:
        # extract text from common fields
        if isinstance(result, dict):
            if "response" in result:
                text = result["response"]
            elif "output" in result:
                text = result["output"]
            elif "responses" in result and isinstance(result["responses"], list):
                # batch-style response
                return result["responses"][0].strip()
            else:
                # chat-style or fallback
                choices = result.get("choices")
                if isinstance(choices, list) and choices:
                    text = choices[0].get("message", {}).get("content", "")
                else:
                    text = str(result)
        elif isinstance(result, list):
            # pure list batch
            return result[0].strip()
        else:
            text = str(result)
    text = str(text).strip()
    # normalize yes/no
    low = text.lower()
    if low.startswith("yes"): return "Yes"
    if low.startswith("no"):  return "No"
    return text

# Process each file
for input_path in input_files:
    # detect template tag and output name
    match = re.search(r"templ-\d+", input_path)
    if not match:
        print(f"Skipping {input_path}, no template tag found.")
        continue
    template_tag = match.group()
    model_tag = model_name.replace(':', '-')
    output_path = f"relevance_210725_completions_{model_tag}-{template_tag}.csv"

    print(f"\nProcessing {input_path} -> {output_path}")
    df = pd.read_csv(input_path, encoding='utf-8')
    # ensure columns exist
    if 'eval_prompt' not in df.columns:
        # assume last column is prompt
        df.rename(columns={df.columns[-1]: 'eval_prompt'}, inplace=True)
    df['eval_completion'] = None
    df['model'] = None

    total = len(df)
    start_all = time.perf_counter()
    completed = 0
    timings = []

    # parallel calls
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        futures = {}
        for idx, prompt in df['eval_prompt'].items():
            futures[executor.submit(call_ollama_single, prompt)] = idx

        for fut in as_completed(futures):
            idx = futures[fut]
            t0 = time.perf_counter()
            try:
                completion = fut.result()
            except Exception as e:
                completion = f"Error: {e}"
            dt = time.perf_counter() - t0
            df.at[idx, 'eval_completion'] = completion
            df.at[idx, 'model'] = model_name

            completed += 1
            timings.append(time.perf_counter() - start_all)
            avg_t = sum(timings) / len(timings)
            remain = total - completed
            eta_sec = remain * avg_t / completed if completed > 0 else 0
            print(f"{completed}/{total} rows done, last={dt:.2f}s, ETA ~{eta_sec/60:.1f}min")

    total_time = time.perf_counter() - start_all
    print(f"Finished {total} rows in {total_time/60:.2f} minutes.")

    # write output
    df.to_csv(output_path, index=False, encoding='utf-8')
    print(f"Wrote results to {output_path}")


In [None]:
import time
import json
import requests
import re
from pathlib import Path
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, as_completed
import logging

# Configuration
MAX_WORKERS = 5  # Reduced for better stability
REQUEST_TIMEOUT = 120  # Increased timeout
RETRY_ATTEMPTS = 2
TEST_SUBSET = 10  # Set to None to process all rows

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def call_ollama_single(prompt: str, max_retries: int = RETRY_ATTEMPTS) -> str:
    """Call Ollama for a single prompt with retry logic and proper timeout handling."""
    
    payload = {
        "model": model_name,
        "prompt": prompt,
        "stream": False
    }
    
    for attempt in range(max_retries + 1):
        try:
            resp = requests.post(
                ollama_url,
                headers={"Content-Type": "application/json"},
                data=json.dumps(payload), 
                timeout=REQUEST_TIMEOUT
            )
            
            if resp.status_code != 200:
                if attempt < max_retries:
                    logger.warning(f"HTTP {resp.status_code} on attempt {attempt + 1}, retrying...")
                    time.sleep(1)
                    continue
                return f"HTTPError: {resp.status_code}"
            
            try:
                result = resp.json()
            except ValueError as e:
                if attempt < max_retries:
                    logger.warning(f"JSON decode error on attempt {attempt + 1}, retrying...")
                    time.sleep(1)
                    continue
                return f"JSONError: {e}"
            
            # Extract text from response
            text = extract_response_text(result)
            
            # Normalize yes/no responses
            low = text.lower().strip()
            if low.startswith("yes"): 
                return "Yes"
            if low.startswith("no"): 
                return "No"
            return text
            
        except requests.exceptions.Timeout:
            if attempt < max_retries:
                logger.warning(f"Timeout on attempt {attempt + 1}, retrying...")
                time.sleep(2)
                continue
            return "TimeoutError: Request timed out"
        except requests.exceptions.RequestException as e:
            if attempt < max_retries:
                logger.warning(f"Request error on attempt {attempt + 1}: {e}, retrying...")
                time.sleep(2)
                continue
            return f"RequestError: {e}"
        except Exception as e:
            if attempt < max_retries:
                logger.warning(f"Unexpected error on attempt {attempt + 1}: {e}, retrying...")
                time.sleep(2)
                continue
            return f"UnexpectedError: {e}"
    
    return "Error: All retry attempts failed"

def extract_response_text(result) -> str:
    """Extract text response from various Ollama response formats."""
    if isinstance(result, dict):
        if "response" in result:
            return str(result["response"]).strip()
        elif "output" in result:
            return str(result["output"]).strip()
        elif "responses" in result and isinstance(result["responses"], list):
            return str(result["responses"][0]).strip()
        else:
            # Chat-style response
            choices = result.get("choices")
            if isinstance(choices, list) and choices:
                return str(choices[0].get("message", {}).get("content", "")).strip()
            else:
                return str(result).strip()
    elif isinstance(result, list) and result:
        return str(result[0]).strip()
    else:
        return str(result).strip()

def process_file(input_path: str):
    """Process a single input file."""
    # Detect template tag and create output name
    match = re.search(r"templ-\d+", input_path)
    if not match:
        logger.warning(f"Skipping {input_path}, no template tag found.")
        return
    
    template_tag = match.group()
    model_tag = model_name.replace(':', '-')
    output_path = f"relevance_210725_completions_{model_tag}-{template_tag}.csv"
    
    logger.info(f"Processing {input_path} -> {output_path}")
    
    try:
        df = pd.read_csv(input_path, encoding='utf-8')
    except Exception as e:
        logger.error(f"Failed to read {input_path}: {e}")
        return
    
    """ # Ensure eval_prompt column exists
    if 'eval_prompt' not in df.columns:
        if df.empty:
            logger.error(f"Empty DataFrame in {input_path}")
            return
        # Assume last column is prompt
        df.rename(columns={df.columns[-1]: 'eval_prompt'}, inplace=True)
        logger.info(f"Renamed column '{df.columns[-1]}' to 'eval_prompt'") """
    
    # Apply test subset if configured
    original_total = len(df)
    if TEST_SUBSET is not None and len(df) > TEST_SUBSET:
        df = df.head(TEST_SUBSET).copy()
        logger.info(f"Processing subset: {len(df)} rows out of {original_total}")
    
    # Initialize result columns
    df['eval_completion'] = None
    df['model'] = model_name
    
    total = len(df)
    logger.info(f"Starting processing of {total} rows with {MAX_WORKERS} workers")
    
    start_time = time.perf_counter()
    completed = 0
    request_times = []
    
    # Process requests in parallel
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        # Submit all tasks
        future_to_idx = {}
        for idx, prompt in df['eval_prompt'].items():
            if pd.isna(prompt) or prompt == "":
                df.at[idx, 'eval_completion'] = "Error: Empty prompt"
                completed += 1
                continue
            future = executor.submit(call_ollama_single, str(prompt))
            future_to_idx[future] = idx
        
        # Process completed requests
        for future in as_completed(future_to_idx):
            request_start = time.perf_counter()
            idx = future_to_idx[future]
            
            try:
                completion = future.result()
            except Exception as e:
                completion = f"FutureError: {e}"
                logger.error(f"Future error for row {idx}: {e}")
            
            request_time = time.perf_counter() - request_start
            request_times.append(request_time)
            
            df.at[idx, 'eval_completion'] = completion
            completed += 1
            
            # Calculate and display progress
            if completed % max(1, total // 20) == 0 or completed == total:  # Update every 5%
                elapsed_total = time.perf_counter() - start_time
                if request_times:
                    avg_request_time = sum(request_times) / len(request_times)
                    remaining = total - completed
                    eta_sec = remaining * avg_request_time
                    
                    logger.info(f"Progress: {completed}/{total} ({100*completed/total:.1f}%) | "
                              f"Last: {request_time:.1f}s | Avg: {avg_request_time:.1f}s | "
                              f"ETA: {eta_sec/60:.1f}min")
    
    total_time = time.perf_counter() - start_time
    successful = df['eval_completion'].notna().sum()
    errors = df['eval_completion'].str.contains('Error:', case=False, na=False).sum()
    
    logger.info(f"Completed {total} rows in {total_time/60:.2f} minutes")
    logger.info(f"Success rate: {successful-errors}/{total} ({100*(successful-errors)/total:.1f}%)")
    if errors > 0:
        logger.warning(f"Errors encountered: {errors} rows")
    
    # Write output
    try:
        df.to_csv(output_path, index=False, encoding='utf-8')
        logger.info(f"Results saved to {output_path}")
    except Exception as e:
        logger.error(f"Failed to save results: {e}")

def main():
    """Main execution function."""
    logger.info(f"Starting batch processing with {MAX_WORKERS} workers, timeout {REQUEST_TIMEOUT}s")
    if TEST_SUBSET:
        logger.info(f"TEST MODE: Processing only first {TEST_SUBSET} rows per file")
    
    for input_path in input_files:
        if not Path(input_path).exists():
            logger.error(f"Input file not found: {input_path}")
            continue
        process_file(input_path)
    
    logger.info("All files processed")

if __name__ == "__main__":
    main()

2025-07-22 16:33:20,303 - INFO - Starting batch processing with 5 workers, timeout 120s
2025-07-22 16:33:20,304 - INFO - TEST MODE: Processing only first 10 rows per file
2025-07-22 16:33:20,305 - INFO - Processing relevance_210725_prompts_templ-1.csv -> relevance_210725_completions_deepseek-r1-70b-templ-1.csv
2025-07-22 16:33:20,318 - INFO - Processing subset: 10 rows out of 1000
2025-07-22 16:33:20,319 - INFO - Starting processing of 10 rows with 5 workers
2025-07-22 16:39:14,229 - INFO - Progress: 1/10 (10.0%) | Last: 0.0s | Avg: 0.0s | ETA: 0.0min
2025-07-22 16:39:24,524 - INFO - Progress: 2/10 (20.0%) | Last: 0.0s | Avg: 0.0s | ETA: 0.0min
2025-07-22 16:39:24,529 - INFO - Progress: 3/10 (30.0%) | Last: 0.0s | Avg: 0.0s | ETA: 0.0min
2025-07-22 16:39:24,607 - INFO - Progress: 4/10 (40.0%) | Last: 0.0s | Avg: 0.0s | ETA: 0.0min
2025-07-22 16:39:24,633 - INFO - Progress: 5/10 (50.0%) | Last: 0.0s | Avg: 0.0s | ETA: 0.0min


KeyboardInterrupt: 

In [None]:
# runs pretty shit
import time
import json
import requests
import re
from pathlib import Path
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, as_completed
import logging
from typing import List, Dict, Any, Optional
import asyncio
import aiohttp
import threading

# Configuration
MAX_WORKERS = 8  # Increased for better throughput
BATCH_SIZE = 15  # Reduced batch size for reasoning model
REQUEST_TIMEOUT = 90  # Increased timeout for reasoning models
RETRY_ATTEMPTS = 2
TEST_SUBSET = 50  # Set to None to process all rows
PROGRESS_UPDATE_INTERVAL = 5  # Update progress every N completions

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class OllamaProcessor:
    """Optimized Ollama API processor with batch capabilities and better error handling."""
    
    def __init__(self, url: str, model: str, max_workers: int = MAX_WORKERS):
        self.url = url
        self.model = model
        self.max_workers = max_workers
        self.session = None
        
    def create_payload(self, prompt: str) -> Dict[str, Any]:
        """Create standardized payload for Ollama API."""
        return {
            "model": self.model,
            "prompt": str(prompt).strip(),
            "stream": False,
            "options": {
                "temperature": 0.1,  # Lower temperature for more consistent responses
                "num_predict": -1    # No limit on response length (let model decide)
            }
        }
    
    def normalize_response(self, text: str) -> str:
        """Normalize and clean response text."""
        if not text:
            return "Error: Empty response"
        
        text = str(text).strip()
        if not text:
            return "Error: Empty response after cleaning"
        
        # Normalize yes/no responses
        lower_text = text.lower()
        if lower_text.startswith("yes") or lower_text == "y":
            return "Yes"
        if lower_text.startswith("no") or lower_text == "n":
            return "No"
        
        return text
    
    def extract_response_text(self, result: Any) -> str:
        """Extract text response from Ollama API response."""
        try:
            if isinstance(result, dict):
                if "response" in result:
                    return str(result["response"]).strip()
                elif "output" in result:
                    return str(result["output"]).strip()
                elif "message" in result and isinstance(result["message"], dict):
                    return str(result["message"].get("content", "")).strip()
                elif "choices" in result and isinstance(result["choices"], list) and result["choices"]:
                    return str(result["choices"][0].get("message", {}).get("content", "")).strip()
                else:
                    # Fallback: convert entire dict to string
                    return str(result).strip()
            elif isinstance(result, list) and result:
                return str(result[0]).strip()
            else:
                return str(result).strip()
        except Exception as e:
            return f"ExtractionError: {e}"
    
    def call_ollama_single(self, prompt: str) -> str:
        """Make a single API call to Ollama with improved error handling."""
        if not prompt or pd.isna(prompt):
            return "Error: Empty or null prompt"
        
        payload = self.create_payload(prompt)
        
        for attempt in range(RETRY_ATTEMPTS + 1):
            try:
                response = requests.post(
                    self.url,
                    headers={"Content-Type": "application/json"},
                    json=payload,  # Use json parameter instead of data + dumps
                    timeout=REQUEST_TIMEOUT
                )
                
                if response.status_code == 200:
                    try:
                        result = response.json()
                        text = self.extract_response_text(result)
                        return self.normalize_response(text)
                    except json.JSONDecodeError as e:
                        if attempt < RETRY_ATTEMPTS:
                            logger.warning(f"JSON decode error (attempt {attempt + 1}): {e}")
                            time.sleep(0.5)
                            continue
                        return f"JSONError: {e}"
                else:
                    if attempt < RETRY_ATTEMPTS:
                        logger.warning(f"HTTP {response.status_code} (attempt {attempt + 1})")
                        time.sleep(1)
                        continue
                    return f"HTTPError: {response.status_code}"
                    
            except requests.exceptions.Timeout:
                if attempt < RETRY_ATTEMPTS:
                    logger.warning(f"Timeout (attempt {attempt + 1})")
                    time.sleep(1)
                    continue
                return "TimeoutError: Request timed out"
            except requests.exceptions.ConnectionError as e:
                if attempt < RETRY_ATTEMPTS:
                    logger.warning(f"Connection error (attempt {attempt + 1}): {e}")
                    time.sleep(2)
                    continue
                return f"ConnectionError: {e}"
            except Exception as e:
                if attempt < RETRY_ATTEMPTS:
                    logger.warning(f"Unexpected error (attempt {attempt + 1}): {e}")
                    time.sleep(1)
                    continue
                return f"UnexpectedError: {e}"
        
        return "Error: All retry attempts failed"
    
    def process_batch(self, batch_data: List[tuple]) -> List[tuple]:
        """Process a batch of prompts efficiently."""
        results = []
        
        with ThreadPoolExecutor(max_workers=min(self.max_workers, len(batch_data))) as executor:
            # Submit all requests in the batch
            future_to_data = {
                executor.submit(self.call_ollama_single, prompt): (idx, prompt)
                for idx, prompt in batch_data
            }
            
            # Collect results as they complete
            for future in as_completed(future_to_data):
                idx, original_prompt = future_to_data[future]
                try:
                    completion = future.result()
                except Exception as e:
                    completion = f"FutureError: {e}"
                    logger.error(f"Future error for row {idx}: {e}")
                
                results.append((idx, completion))
        
        return results

def process_file_optimized(input_path: str, processor: OllamaProcessor):
    """Process a single input file with optimized batch processing."""
    # Extract template tag and create output name
    match = re.search(r"templ-\d+", input_path)
    if not match:
        logger.warning(f"Skipping {input_path}, no template tag found.")
        return
    
    template_tag = match.group()
    model_tag = processor.model.replace(':', '-')
    output_path = f"relevance_210725_completions_{model_tag}-{template_tag}.csv"
    
    logger.info(f"Processing {input_path} -> {output_path}")
    
    # Load and validate data
    try:
        df = pd.read_csv(input_path, encoding='utf-8')
        logger.info(f"Loaded {len(df)} rows from {input_path}")
    except Exception as e:
        logger.error(f"Failed to read {input_path}: {e}")
        return
    
    if df.empty:
        logger.error(f"Empty DataFrame in {input_path}")
        return
    
    # Ensure eval_prompt column exists
    if 'eval_prompt' not in df.columns:
        last_col = df.columns[-1]
        df.rename(columns={last_col: 'eval_prompt'}, inplace=True)
        logger.info(f"Renamed column '{last_col}' to 'eval_prompt'")
    
    # Apply test subset if configured
    original_total = len(df)
    if TEST_SUBSET is not None and len(df) > TEST_SUBSET:
        df = df.head(TEST_SUBSET).copy()
        logger.info(f"TEST MODE: Processing {len(df)} rows out of {original_total}")
    
    # Initialize result columns
    df['eval_completion'] = None
    df['model'] = processor.model
    
    # Filter out empty prompts
    valid_indices = df[df['eval_prompt'].notna() & (df['eval_prompt'] != "")].index.tolist()
    invalid_count = len(df) - len(valid_indices)
    
    if invalid_count > 0:
        logger.warning(f"Found {invalid_count} empty/null prompts, skipping those rows")
        df.loc[~df.index.isin(valid_indices), 'eval_completion'] = "Error: Empty prompt"
    
    if not valid_indices:
        logger.error("No valid prompts found in the dataset")
        return
    
    total_valid = len(valid_indices)
    logger.info(f"Processing {total_valid} valid prompts in batches of {BATCH_SIZE}")
    
    # Process in batches
    start_time = time.perf_counter()
    completed = 0
    all_request_times = []
    
    # Create batches
    batches = [valid_indices[i:i + BATCH_SIZE] for i in range(0, len(valid_indices), BATCH_SIZE)]
    logger.info(f"Created {len(batches)} batches")
    
    for batch_num, batch_indices in enumerate(batches, 1):
        batch_start = time.perf_counter()
        
        # Prepare batch data
        batch_data = [(idx, df.at[idx, 'eval_prompt']) for idx in batch_indices]
        
        logger.info(f"Processing batch {batch_num}/{len(batches)} ({len(batch_data)} items)")
        
        # Process the batch
        batch_results = processor.process_batch(batch_data)
        
        # Update DataFrame with results
        for idx, completion in batch_results:
            df.at[idx, 'eval_completion'] = completion
            completed += 1
        
        batch_time = time.perf_counter() - batch_start
        avg_time_per_request = batch_time / len(batch_data)
        all_request_times.append(avg_time_per_request)
        
        # Calculate ETA
        if completed > 0:
            elapsed_total = time.perf_counter() - start_time
            avg_batch_time = elapsed_total / batch_num
            remaining_batches = len(batches) - batch_num
            eta_seconds = remaining_batches * avg_batch_time
            
            logger.info(f"Batch {batch_num} completed in {batch_time:.1f}s "
                       f"(avg {avg_time_per_request:.2f}s/request)")
            logger.info(f"Progress: {completed}/{total_valid} ({100*completed/total_valid:.1f}%) | "
                       f"ETA: {eta_seconds/60:.1f} minutes")
        
        # Small delay between batches to prevent overwhelming the API
        if batch_num < len(batches):
            time.sleep(0.1)
    
    # Final statistics
    total_time = time.perf_counter() - start_time
    successful = df['eval_completion'].notna().sum()
    errors = df['eval_completion'].str.contains('Error:', case=False, na=False).sum()
    
    logger.info(f"\n{'='*60}")
    logger.info(f"PROCESSING COMPLETE")
    logger.info(f"Total time: {total_time/60:.2f} minutes")
    logger.info(f"Total rows processed: {len(df)}")
    logger.info(f"Valid prompts: {total_valid}")
    logger.info(f"Successful completions: {successful - errors}")
    logger.info(f"Errors: {errors}")
    logger.info(f"Success rate: {100*(successful-errors)/len(df):.1f}%")
    if all_request_times:
        logger.info(f"Average time per request: {sum(all_request_times)/len(all_request_times):.2f}s")
    logger.info(f"{'='*60}\n")
    
    # Save results
    try:
        df.to_csv(output_path, index=False, encoding='utf-8')
        logger.info(f"Results saved to {output_path}")
        
        # Quick validation of output
        saved_df = pd.read_csv(output_path)
        logger.info(f"Validation: Saved file has {len(saved_df)} rows")
        
    except Exception as e:
        logger.error(f"Failed to save results to {output_path}: {e}")

def main():
    """Main execution function with improved setup and error handling."""
    logger.info(f"Starting optimized batch processing")
    logger.info(f"Configuration:")
    logger.info(f"  - Max workers: {MAX_WORKERS}")
    logger.info(f"  - Batch size: {BATCH_SIZE}")
    logger.info(f"  - Request timeout: {REQUEST_TIMEOUT}s")
    logger.info(f"  - Retry attempts: {RETRY_ATTEMPTS}")
    if TEST_SUBSET:
        logger.info(f"  - TEST MODE: Processing only first {TEST_SUBSET} rows per file")
    logger.info(f"  - Model: {model_name}")
    logger.info(f"  - Ollama URL: {ollama_url}")
    
    # Initialize processor
    processor = OllamaProcessor(ollama_url, model_name, MAX_WORKERS)
    
    # Test connection
    """ logger.info("Testing connection to Ollama...")
    test_result = processor.call_ollama_single("Test connection. Please respond with 'OK'.")
    if "Error" in test_result:
        logger.error(f"Connection test failed: {test_result}")
        logger.error("Please check that Ollama is running and the URL is correct")
        return
    else:
        logger.info(f"Connection test successful: {test_result}") """
    
    # Process all input files
    total_files = len(input_files)
    logger.info(f"Found {total_files} input files to process")
    
    for file_num, input_path in enumerate(input_files, 1):
        logger.info(f"\n{'='*60}")
        logger.info(f"PROCESSING FILE {file_num}/{total_files}: {input_path}")
        logger.info(f"{'='*60}")
        
        if not Path(input_path).exists():
            logger.error(f"Input file not found: {input_path}")
            continue
        
        file_start = time.perf_counter()
        process_file_optimized(input_path, processor)
        file_time = time.perf_counter() - file_start
        
        logger.info(f"File {file_num} completed in {file_time/60:.2f} minutes")
    
    logger.info(f"\n{'='*60}")
    logger.info("ALL FILES PROCESSED SUCCESSFULLY")
    logger.info(f"{'='*60}")

if __name__ == "__main__":
    main()

"""
THIS IS OUTDATED. USE THE CODE BLOCK FOLLOWING THIS ONE!
"""

In [None]:
import time
import json
import requests
import re
from pathlib import Path
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, as_completed
import logging
from typing import List, Dict, Any

# Configuration
MAX_WORKERS = 12  # Increased workers for slow model
TEST_SUBSET = None # None to process all rows
REQUEST_TIMEOUT = 150# Longer timeout but with aggressive parallelization
RETRY_ATTEMPTS = 1  # Reduced retries to fail fast

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class OllamaProcessor:
    """Optimized Ollama API processor with yes/no normalization."""
    
    def __init__(self, url: str, model: str, max_workers: int = MAX_WORKERS):
        self.url = url
        self.model = model
        self.max_workers = max_workers
        self.session = requests.Session()
        self.session.headers.update({"Content-Type": "application/json"})
        adapter = requests.adapters.HTTPAdapter(
            pool_connections=max_workers,
            pool_maxsize=max_workers * 2,
            max_retries=0
        )
        self.session.mount("http://", adapter)
        self.session.mount("https://", adapter)
        
    def create_payload(self, prompt: str) -> Dict[str, Any]:
        return {
            "model": self.model,
            "prompt": prompt.strip(),
            "stream": False,
            "options": {
                "temperature": 0.0,
                "top_p": 0.1,
                "num_predict": -1,
                "num_ctx": 4096,
                "repeat_penalty": 1.0
            }
        }
    
    def normalize_response(self, text: str) -> str:
        """Strip <think>…</think>, then normalize to Yes/No if response ends with them."""
        if not text:
            return "Error: Empty response"
        text = text.strip()
        # Remove any <think>...</think> block
        m = re.search(r'<think>.*?</think>\s*(.*)', text, re.IGNORECASE | re.DOTALL)
        if m:
            text = m.group(1).strip()

        low = text.lower().rstrip('.!?')  # drop any trailing punctuation
        if low.endswith("yes"):
            return "Yes"
        if low.endswith("no"):
            return "No"
        return text  # fallback to full text

    
    def extract_response_text(self, result: Any) -> str:
        """Extract text from various Ollama JSON shapes."""
        if isinstance(result, dict):
            for key in ("response", "output", "content"):
                if key in result and isinstance(result[key], str):
                    return result[key].strip()
            # Chat-style
            if "choices" in result and isinstance(result["choices"], list):
                msg = result["choices"][0].get("message", {})
                return msg.get("content", "").strip()
            if "message" in result and isinstance(result["message"], dict):
                return result["message"].get("content", "").strip()
            return str(result)
        if isinstance(result, list) and result:
            return str(result[0])
        return str(result)

    def call_ollama_single(self, prompt: str) -> str:
        if not prompt:
            return "Error: Empty prompt"
        payload = self.create_payload(prompt)
        for attempt in range(RETRY_ATTEMPTS + 1):
            try:
                resp = self.session.post(self.url, json=payload, timeout=REQUEST_TIMEOUT)
                if resp.status_code == 200:
                    try:
                        result = resp.json()
                        text = self.extract_response_text(result)
                        return self.normalize_response(text)
                    except json.JSONDecodeError as e:
                        return f"JSONError: {e}"
                else:
                    return f"HTTPError: {resp.status_code}"
            except requests.exceptions.Timeout:
                return "TimeoutError: Request timed out"
            except requests.exceptions.ConnectionError as e:
                return f"ConnectionError: {e}"
            except Exception as e:
                return f"UnexpectedError: {e}"
        return "Error: All retry attempts failed"
    
    def process_all_parallel(self, data_items: List[tuple]) -> List[tuple]:
        """Process items in parallel and report accurate ETA."""
        total = len(data_items)
        completed = 0
        results: List[tuple] = []
        start = time.perf_counter()
        
        logger.info(f"Starting processing of {total} items with {self.max_workers} workers")
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            future_to_idx = {executor.submit(self.call_ollama_single, prompt): idx
                             for idx, prompt in data_items}
            
            for future in as_completed(future_to_idx):
                idx = future_to_idx[future]
                try:
                    res = future.result()
                except Exception as e:
                    res = f"FutureError: {e}"
                    logger.error(f"Error in future for row {idx}: {e}")
                results.append((idx, res))
                completed += 1

                if completed % 5 == 0 or completed == total:
                    elapsed = time.perf_counter() - start
                    avg = elapsed / completed
                    remaining = total - completed
                    eta_sec = remaining * avg
                    eta_min = eta_sec / 60
                    logger.info(
                        f"Progress: {completed}/{total} ({100*completed/total:.1f}%) | "
                        f"Avg: {avg:.1f}s | ETA: {eta_min:.1f}min"
                    )

        return results

def process_file_optimized(input_path: str, processor: OllamaProcessor):
    match = re.search(r"templ-\d+", input_path)
    if not match:
        logger.warning(f"Skipping {input_path}, no template tag found.")
        return
    template_tag = match.group()
    model_tag = processor.model.replace(":", "-")
    output_path = f"relevance_210725_completions_{model_tag}-{template_tag}.csv"
    
    logger.info(f"Processing {input_path} -> {output_path}")
    try:
        df = pd.read_csv(input_path, encoding='utf-8')
    except Exception as e:
        logger.error(f"Failed to read {input_path}: {e}")
        return
    if df.empty:
        logger.error(f"No data in {input_path}")
        return
    
    # Ensure eval_prompt column
    if 'eval_prompt' not in df.columns:
        df.rename(columns={df.columns[-1]: 'eval_prompt'}, inplace=True)
    
    # changed to None to run through templ-1 file
    if TEST_SUBSET and len(df) > TEST_SUBSET:
        df = df.head(TEST_SUBSET).copy()
        logger.info(f"TEST MODE: Processing first {len(df)} rows")
    
    df['eval_completion'] = None
    df['model'] = processor.model
    
    valid = df['eval_prompt'].notna() & (df['eval_prompt'] != "")
    valid_indices = df[valid].index.tolist()
    if not valid_indices:
        logger.error("No valid prompts to process.")
        return
    
    # Parallel processing
    data = [(idx, df.at[idx, 'eval_prompt']) for idx in valid_indices]
    t0 = time.perf_counter()
    results = processor.process_all_parallel(data)
    total_time = time.perf_counter() - t0
    
    # Update DataFrame
    for idx, completion in results:
        df.at[idx, 'eval_completion'] = completion
    
    # Summary stats
    successful = df['eval_completion'].notna().sum()
    errors = df['eval_completion'].str.contains('Error:', na=False).sum()
    timeouts = df['eval_completion'].str.contains('TimeoutError:', na=False).sum()
    rate = 100 * (successful - errors) / len(df)
    
    logger.info("="*50)
    logger.info(f"Finished in {total_time/60:.2f} minutes")
    logger.info(f"Rows processed: {len(df)} (Valid: {len(valid_indices)})")
    logger.info(f"Successes: {successful - errors}, Errors: {errors} (Timeouts: {timeouts})")
    logger.info(f"Success rate: {rate:.1f}%")
    logger.info(f"Throughput: {len(valid_indices)/(total_time/60):.1f} req/min")
    logger.info("="*50)
    
    # Save
    try:
        df.to_csv(output_path, index=False, encoding='utf-8')
        logger.info(f"Saved to {output_path}")
    except Exception as e:
        logger.error(f"Failed to save {output_path}: {e}")

def main():
    logger.info("Starting optimized processing")
    logger.info(f"- Model: {model_name}")
    logger.info(f"- Ollama URL: {ollama_url}")
    logger.info(f"- Max workers: {MAX_WORKERS}")
    if TEST_SUBSET:
        logger.info(f"- TEST SUBSET: {TEST_SUBSET} rows")
    
    processor = OllamaProcessor(ollama_url, model_name, MAX_WORKERS)
    
    for input_path in input_files:
        if not Path(input_path).exists():
            logger.error(f"File not found: {input_path}")
            continue
        process_file_optimized(input_path, processor)

if __name__ == "__main__":
    main()


# one file took 184 minutes, only 4.6% timeout error


2025-07-23 10:36:59,014 - INFO - Starting optimized processing
2025-07-23 10:36:59,015 - INFO - - Model: deepseek-r1:70b
2025-07-23 10:36:59,015 - INFO - - Ollama URL: http://localhost:11434/api/generate
2025-07-23 10:36:59,016 - INFO - - Max workers: 12
2025-07-23 10:36:59,017 - INFO - Processing relevance_210725_prompts_templ-1.csv -> relevance_210725_completions_deepseek-r1-70b-templ-1.csv
2025-07-23 10:36:59,036 - INFO - Starting processing of 1000 items with 12 workers
2025-07-23 10:38:13,442 - INFO - Progress: 5/1000 (0.5%) | Avg: 14.9s | ETA: 246.8min
2025-07-23 10:39:12,808 - INFO - Progress: 10/1000 (1.0%) | Avg: 13.4s | ETA: 220.7min
2025-07-23 10:39:57,582 - INFO - Progress: 15/1000 (1.5%) | Avg: 11.9s | ETA: 195.4min
2025-07-23 10:40:49,065 - INFO - Progress: 20/1000 (2.0%) | Avg: 11.5s | ETA: 187.9min
2025-07-23 10:41:44,201 - INFO - Progress: 25/1000 (2.5%) | Avg: 11.4s | ETA: 185.4min
2025-07-23 10:42:37,193 - INFO - Progress: 30/1000 (3.0%) | Avg: 11.3s | ETA: 182.2min


In [None]:
# original, took to long

for input_path in input_files:
    # Determine output file name based on template number and model
    template_match = re.search(r"templ-\d+", input_path)
    if not template_match:
        continue
    template_tag = template_match.group() 
    # replace colon with hyphen in models
    model_tag = model_name.replace(":", "-")
    output_path = f"relevance_210725_completions_{model_tag}-{template_tag}.csv"
    
    with open(input_path, newline='', encoding='utf-8') as infile, \
         open(output_path, 'w', newline='', encoding='utf-8') as outfile:
        reader = csv.reader(infile)
        writer = csv.writer(outfile)
        
        # Read the header and append new column names
        header = next(reader)
        new_header = header + ["eval_completion", "model"]
        writer.writerow(new_header)
        
        # Iterate over each row in the input CSV
        for row in reader:
            if not row:  # skip empty lines if any
                continue
            prompt = row[-1]  # eval_prompt is the last col
            
            # Prepare the JSON payload for Ollama API
            payload = {
                "model": model_name,
                "prompt": prompt,
                "stream": False  # get a single JSON response instead of stream
            }
            
            try:
                response = requests.post(ollama_url, headers={"Content-Type": "application/json"},
                                         data=json.dumps(payload))
            except Exception as e:
                # If there's a connection error or similar, you might want to handle it
                print(f"Error calling Ollama API for prompt: {prompt[:30]}... \n{e}")
                continue
            
            eval_completion = ""
            if response.status_code == 200:
                # Parse JSON response from Ollama
                try:
                    result = response.json()
                except ValueError:
                    # If response is not a valid JSON (unexpected), use raw text
                    result_text = response.text.strip()
                    # Determine yes/no from text
                    if result_text.lower().startswith("yes"):
                        eval_completion = "Yes"
                    elif result_text.lower().startswith("no"):
                        eval_completion = "No"
                    else:
                        eval_completion = result_text  # fallback to whatever it is
                else:
                    # Ollama's response JSON might have the output text in a field.
                    # We attempt common possible keys.
                    if "response" in result:
                        result_text = result["response"]
                    elif "output" in result:
                        result_text = result["output"]
                    elif "content" in result:
                        # If using chat-style response, it might be nested:
                        # e.g., {"model": ..., "choices": [{"message": {"role": "assistant", "content": "Yes"}}], ...}
                        result_text = result.get("content", "") or result.get("message", {}).get("content", "")
                    else:
                        # If none of the known keys, use the full JSON string as fallback
                        result_text = str(result)
                    result_text = str(result_text).strip()
                    # Normalize to "Yes" or "No"
                    if result_text.lower().startswith("yes"):
                        eval_completion = "Yes"
                    elif result_text.lower().startswith("no"):
                        eval_completion = "No"
                    else:
                        eval_completion = result_text
            else:
                # If the API call failed (non-200 status), record the status or an error
                eval_completion = f"Error: HTTP {response.status_code}"
            
            # Append the new columns to the row
            row_with_output = row + [eval_completion, model_name]
            writer.writerow(row_with_output)

    print(f"Completed {input_path} -> {output_path}")