# Fake News Analysis - Batch Processing

This notebook processes posts from CSV files in the `input/` folder and saves results to `output/`.

**Model Configuration:**
- Entry: qwen2.5:1.5b
- Planner: llama3.1:8b
- Researcher: llama3.1:8b
- Analyst: llama3.1:8b

**Features:**
- Parallel processing with configurable workers
- Incremental saving after each batch completes
- Resume capability if interrupted

In [1]:
import os
import glob
import json
import pandas as pd
from datetime import datetime
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Dict, Any, List
from dotenv import load_dotenv
from tqdm.notebook import tqdm
import threading

from src.graphs.v1 import graph
from src.models.context import ModelsRegistry, ModelConfig
from tavily import TavilyClient
from src.utils.observability import (
    UsageMetadataCallbackHandler,
    set_callback_handler,
)

load_dotenv()

# Create output folder if it doesn't exist
Path("output").mkdir(exist_ok=True)

## Configuration

In [2]:
# Ollama configuration
OLLAMA_CONFIG = {
    "model_provider": "ollama",
    "base_url": "http://localhost:11434",
}

# Model configuration for each node
MODELS_CONFIG = {
    "entry": "qwen2.5:1.5b",
    "planner": "llama3.1:8b",
    "researcher": "llama3.1:8b",
    "analyst": "llama3.1:8b",
}

# Parallel processing settings
MAX_WORKERS = 4  # Adjust based on your system resources
BATCH_SIZE = 10  # Save results after every N completed items
TEMPERATURE = 0.0

# Output file prefix (timestamp will be added)
RUN_ID = datetime.now().strftime("%Y%m%d_%H%M%S")
OUTPUT_CSV = f"output/analysis_results_{RUN_ID}.csv"
OUTPUT_JSON = f"output/analysis_results_{RUN_ID}.jsonl"  # JSON Lines format for incremental writes

print(f"Models configuration: {MODELS_CONFIG}")
print(f"Max workers: {MAX_WORKERS}")
print(f"Batch size (save every N): {BATCH_SIZE}")
print(f"Output files:")
print(f"  - CSV: {OUTPUT_CSV}")
print(f"  - JSONL: {OUTPUT_JSON}")

Models configuration: {'entry': 'qwen2.5:1.5b', 'planner': 'llama3.1:8b', 'researcher': 'llama3.1:8b', 'analyst': 'llama3.1:8b'}
Max workers: 4
Batch size (save every N): 10
Output files:
  - CSV: output/analysis_results_20260201_134524.csv
  - JSONL: output/analysis_results_20260201_134524.jsonl


## Load Input Data

In [3]:
# Find all CSV files in input folder
input_files = glob.glob("input/*.csv")
print(f"Found {len(input_files)} CSV file(s) in input folder:")
for f in input_files:
    print(f"  - {f}")

# Load and concatenate all CSV files
dfs = []
for file_path in input_files:
    df = pd.read_csv(file_path)
    df["source_file"] = os.path.basename(file_path)
    dfs.append(df)
    print(f"  Loaded {len(df)} rows from {os.path.basename(file_path)}")

df_input = pd.concat(dfs, ignore_index=True) if dfs else pd.DataFrame()
print(f"\nTotal posts to analyze: {len(df_input)}")
df_input.head()

Found 1 CSV file(s) in input folder:
  - input\data-1769622510591.csv
  Loaded 1000 rows from data-1769622510591.csv

Total posts to analyze: 1000


Unnamed: 0,id_mention,full_text,source_file
0,00006b826eb2064cd4c069df5e9bebac,"Gente pelo amor de Deus Boulos n√£o , miseric√≥r...",data-1769622510591.csv
1,00014e37ef658664f02ab02c991d45f7,Estou com Boulos 50 üôè,data-1769622510591.csv
2,00018e4f326728431c1c97b775f31b67,Qual √© sua opini√£o sobre isso? üëÄü§î . J√° que o D...,data-1769622510591.csv
3,0001c9de9cf636d2b5faa31eb88a87cd,Um carro da funer√°ria acaba de entrar no hospi...,data-1769622510591.csv
4,0001df78f4f1323c23cab956a749ac62,Estad√£o Conte√∫doi Estad√£o Conte√∫do https://ist...,data-1769622510591.csv


## Initialize Components

In [4]:
# Initialize Tavily client (shared across workers)
tavily = TavilyClient(api_key=os.environ["TAVILY_API_KEY"])

def create_models_registry() -> ModelsRegistry:
    """Create a models registry with the configured models."""
    return ModelsRegistry(
        entry=ModelConfig(
            model=MODELS_CONFIG["entry"],
            temperature=TEMPERATURE,
            **OLLAMA_CONFIG,
        ),
        planner=ModelConfig(
            model=MODELS_CONFIG["planner"],
            temperature=TEMPERATURE,
            **OLLAMA_CONFIG,
        ),
        researcher=ModelConfig(
            model=MODELS_CONFIG["researcher"],
            temperature=TEMPERATURE,
            **OLLAMA_CONFIG,
        ),
        analyst=ModelConfig(
            model=MODELS_CONFIG["analyst"],
            temperature=TEMPERATURE,
            **OLLAMA_CONFIG,
        ),
        default_temperature=TEMPERATURE,
    )

print("Components initialized successfully!")

Components initialized successfully!


## Incremental Save Functions

In [5]:
# Thread-safe file writing
file_lock = threading.Lock()

def save_result_jsonl(result: Dict[str, Any], filepath: str):
    """Append a single result to JSONL file (thread-safe)."""
    with file_lock:
        with open(filepath, "a", encoding="utf-8") as f:
            f.write(json.dumps(result, ensure_ascii=False) + "\n")


def save_batch_csv(results: List[Dict[str, Any]], filepath: str, is_first_batch: bool = False):
    """Save a batch of results to CSV (append mode)."""
    csv_columns = [
        "id_mention", "full_text", "success", "error",
        "relevant", "relevance_reasoning",
        "score", "justification", "plan",
        "processing_time_s", "timestamp",
    ]
    
    df_batch = pd.DataFrame(results)
    # Select only columns that exist
    cols_to_save = [c for c in csv_columns if c in df_batch.columns]
    df_batch = df_batch[cols_to_save]
    
    with file_lock:
        # Write header only for first batch
        df_batch.to_csv(
            filepath, 
            mode="w" if is_first_batch else "a",
            header=is_first_batch,
            index=False, 
            encoding="utf-8"
        )


def load_existing_results(jsonl_path: str) -> set:
    """Load IDs of already processed posts (for resume capability)."""
    processed_ids = set()
    if os.path.exists(jsonl_path):
        with open(jsonl_path, "r", encoding="utf-8") as f:
            for line in f:
                try:
                    result = json.loads(line.strip())
                    if "id_mention" in result:
                        processed_ids.add(result["id_mention"])
                except:
                    pass
    return processed_ids


print("Save functions initialized!")

Save functions initialized!


## Analysis Function

In [6]:
def analyze_post(post_id: str, post_text: str) -> Dict[str, Any]:
    """
    Analyze a single post and return the result.
    
    Args:
        post_id: Unique identifier for the post
        post_text: The text content to analyze
        
    Returns:
        Dictionary with analysis results
    """
    # Create fresh instances for thread safety
    callback = UsageMetadataCallbackHandler()
    set_callback_handler(callback)
    models_registry = create_models_registry()
    
    config = {
        "configurable": {"thread_id": f"analysis_{post_id}"},
        "callbacks": [callback],
    }
    runtime_context = {"models_registry": models_registry, "tavily": tavily}
    
    initial_state = {
        "post": post_text,
        "max_revisions": 3,
    }
    
    start_time = datetime.now()
    
    try:
        resp = graph.invoke(initial_state, context=runtime_context, config=config)
        
        # Build result
        result = {
            "id_mention": post_id,
            "full_text": post_text,
            "success": True,
            "error": None,
            "timestamp": start_time.isoformat(),
            "processing_time_s": (datetime.now() - start_time).total_seconds(),
            "models_config": MODELS_CONFIG,
        }
        
        # Relevance analysis
        rel_analysis = resp["relevance_analysis"]
        if hasattr(rel_analysis, "model_dump"):
            rel_analysis = rel_analysis.model_dump()
        result["relevant"] = rel_analysis.get("relevant", False)
        result["relevance_reasoning"] = rel_analysis.get("reasoning", "")
        
        # If relevant, include full analysis
        if result["relevant"]:
            result["plan"] = resp.get("plan", "")
            
            response = resp.get("response")
            if response:
                if hasattr(response, "model_dump"):
                    response = response.model_dump()
                result["score"] = response.get("score")
                result["justification"] = response.get("justification", "")
            
            # Include references if available
            result["references"] = resp.get("references", [])
        else:
            result["plan"] = None
            result["score"] = None
            result["justification"] = None
            result["references"] = []
        
        # Metrics
        metrics = {}
        for node_name, node_metrics in resp.get("metrics", {}).items():
            if hasattr(node_metrics, "model_dump"):
                metrics[node_name] = node_metrics.model_dump()
            else:
                metrics[node_name] = node_metrics
        result["metrics"] = metrics
        
        return result
        
    except Exception as e:
        return {
            "id_mention": post_id,
            "full_text": post_text,
            "success": False,
            "error": str(e),
            "timestamp": start_time.isoformat(),
            "processing_time_s": (datetime.now() - start_time).total_seconds(),
            "models_config": MODELS_CONFIG,
            "relevant": None,
            "relevance_reasoning": None,
            "plan": None,
            "score": None,
            "justification": None,
            "references": [],
            "metrics": {},
        }

## Run Parallel Analysis with Incremental Saving

In [7]:
def run_parallel_analysis_incremental(
    df: pd.DataFrame,
    output_csv: str,
    output_jsonl: str,
    max_workers: int = MAX_WORKERS,
    batch_size: int = BATCH_SIZE,
    resume: bool = True,
) -> List[Dict[str, Any]]:
    """
    Run analysis on all posts in parallel with incremental saving.
    
    Args:
        df: DataFrame with id_mention and full_text columns
        output_csv: Path to output CSV file
        output_jsonl: Path to output JSONL file
        max_workers: Maximum number of parallel workers
        batch_size: Save to CSV after every N completed items
        resume: If True, skip already processed posts
        
    Returns:
        List of all analysis results
    """
    all_results = []
    batch_results = []
    batches_saved = 0
    
    # Check for existing results (resume capability)
    processed_ids = set()
    if resume and os.path.exists(output_jsonl):
        processed_ids = load_existing_results(output_jsonl)
        print(f"Found {len(processed_ids)} already processed posts, resuming...")
    
    # Filter out already processed
    df_to_process = df[~df["id_mention"].isin(processed_ids)].copy()
    total = len(df_to_process)
    
    if total == 0:
        print("All posts already processed!")
        return all_results
    
    print(f"Starting parallel analysis of {total} posts with {max_workers} workers...")
    print(f"Saving every {batch_size} completed items")
    print(f"Start time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    
    # Initialize CSV with header if starting fresh
    is_first_batch = not os.path.exists(output_csv) or len(processed_ids) == 0
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all tasks
        future_to_post = {
            executor.submit(analyze_post, row["id_mention"], row["full_text"]): row["id_mention"]
            for _, row in df_to_process.iterrows()
        }
        
        # Process completed tasks with progress bar
        with tqdm(total=total, desc="Analyzing posts") as pbar:
            for future in as_completed(future_to_post):
                post_id = future_to_post[future]
                try:
                    result = future.result()
                    all_results.append(result)
                    batch_results.append(result)
                    
                    # Save to JSONL immediately (one line per result)
                    save_result_jsonl(result, output_jsonl)
                    
                    # Update progress bar with status
                    status = "‚úì" if result["success"] else "‚úó"
                    relevant = "R" if result.get("relevant") else "-"
                    pbar.set_postfix_str(f"Last: {post_id[:8]}... [{status}{relevant}]")
                    
                except Exception as e:
                    error_result = {
                        "id_mention": post_id,
                        "success": False,
                        "error": f"Future error: {str(e)}",
                        "timestamp": datetime.now().isoformat(),
                    }
                    all_results.append(error_result)
                    batch_results.append(error_result)
                    save_result_jsonl(error_result, output_jsonl)
                
                pbar.update(1)
                
                # Save batch to CSV when batch_size reached
                if len(batch_results) >= batch_size:
                    save_batch_csv(batch_results, output_csv, is_first_batch and batches_saved == 0)
                    batches_saved += 1
                    pbar.set_description(f"Analyzing posts (saved batch {batches_saved})")
                    batch_results = []  # Reset batch
        
        # Save any remaining results
        if batch_results:
            save_batch_csv(batch_results, output_csv, is_first_batch and batches_saved == 0)
            batches_saved += 1
    
    print(f"\nEnd time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print(f"Total batches saved: {batches_saved}")
    
    # Summary
    successful = sum(1 for r in all_results if r.get("success"))
    relevant = sum(1 for r in all_results if r.get("relevant"))
    print(f"\nSummary:")
    print(f"  - Total processed this run: {len(all_results)}")
    print(f"  - Successful: {successful} ({100*successful/len(all_results):.1f}%)")
    print(f"  - Failed: {len(all_results) - successful}")
    print(f"  - Relevant posts: {relevant}")
    print(f"\nResults saved to:")
    print(f"  - {output_csv}")
    print(f"  - {output_jsonl}")
    
    return all_results

In [None]:
# Run the analysis with incremental saving
results = run_parallel_analysis_incremental(
    df_input,
    output_csv=OUTPUT_CSV,
    output_jsonl=OUTPUT_JSON,
    max_workers=MAX_WORKERS,
    batch_size=BATCH_SIZE,
    resume=True,  # Set to False to start fresh
)

Starting parallel analysis of 1000 posts with 4 workers...
Saving every 10 completed items
Start time: 2026-02-01 13:45:24


Analyzing posts:   0%|          | 0/1000 [00:00<?, ?it/s]

## Load and Analyze All Results

In [None]:
# Load all results from JSONL (includes any previous runs if resumed)
all_results = []
with open(OUTPUT_JSON, "r", encoding="utf-8") as f:
    for line in f:
        all_results.append(json.loads(line.strip()))

df_results = pd.DataFrame(all_results)
print(f"Total results loaded: {len(df_results)}")
df_results.head()

Total results loaded: 1000


Unnamed: 0,id_mention,full_text,success,error,timestamp,processing_time_s,models_config,relevant,relevance_reasoning,plan,score,justification,references,metrics
0,00018e4f326728431c1c97b775f31b67,Qual √© sua opini√£o sobre isso? üëÄü§î . J√° que o D...,False,Initializing ChatOllama requires the langchain...,2026-01-31T14:30:28.044473,0.074004,"{'entry': 'qwen2.5:1.5b', 'planner': 'llama3.1...",,,,,,[],{}
1,0001c9de9cf636d2b5faa31eb88a87cd,Um carro da funer√°ria acaba de entrar no hospi...,False,Initializing ChatOllama requires the langchain...,2026-01-31T14:30:28.045491,0.075088,"{'entry': 'qwen2.5:1.5b', 'planner': 'llama3.1...",,,,,,[],{}
2,00014e37ef658664f02ab02c991d45f7,Estou com Boulos 50 üôè,False,Initializing ChatOllama requires the langchain...,2026-01-31T14:30:28.043418,0.073206,"{'entry': 'qwen2.5:1.5b', 'planner': 'llama3.1...",,,,,,[],{}
3,00006b826eb2064cd4c069df5e9bebac,"Gente pelo amor de Deus Boulos n√£o , miseric√≥r...",False,Initializing ChatOllama requires the langchain...,2026-01-31T14:30:28.042142,0.072423,"{'entry': 'qwen2.5:1.5b', 'planner': 'llama3.1...",,,,,,[],{}
4,0001df78f4f1323c23cab956a749ac62,Estad√£o Conte√∫doi Estad√£o Conte√∫do https://ist...,False,Initializing ChatOllama requires the langchain...,2026-01-31T14:30:28.114637,0.013544,"{'entry': 'qwen2.5:1.5b', 'planner': 'llama3.1...",,,,,,[],{}


## Analysis Summary

In [None]:
print("="*60)
print("ANALYSIS SUMMARY")
print("="*60)

# Success rate
print(f"\n1. PROCESSING STATISTICS:")
print(f"   Total posts: {len(df_results)}")
print(f"   Successful: {df_results['success'].sum()} ({100*df_results['success'].mean():.1f}%)")
print(f"   Failed: {(~df_results['success']).sum()}")

# Relevance distribution
print(f"\n2. RELEVANCE DISTRIBUTION:")
relevant_df = df_results[df_results['success']]
if len(relevant_df) > 0:
    relevant_count = relevant_df['relevant'].sum()
    not_relevant_count = len(relevant_df) - relevant_count
    print(f"   Relevant: {relevant_count} ({100*relevant_count/len(relevant_df):.1f}%)")
    print(f"   Not relevant: {not_relevant_count} ({100*not_relevant_count/len(relevant_df):.1f}%)")

# Score distribution (for relevant posts)
print(f"\n3. SCORE DISTRIBUTION (relevant posts only):")
scores = df_results[df_results['relevant'] == True]['score'].dropna()
if len(scores) > 0:
    print(f"   Count: {len(scores)}")
    print(f"   Mean: {scores.mean():.2f}")
    print(f"   Std: {scores.std():.2f}")
    print(f"   Min: {scores.min():.2f}")
    print(f"   Max: {scores.max():.2f}")
    print(f"   Median: {scores.median():.2f}")
else:
    print("   No scores available")

# Processing time
print(f"\n4. PROCESSING TIME:")
if 'processing_time_s' in df_results.columns:
    times = df_results['processing_time_s'].dropna()
    if len(times) > 0:
        print(f"   Mean: {times.mean():.2f}s")
        print(f"   Min: {times.min():.2f}s")
        print(f"   Max: {times.max():.2f}s")
        print(f"   Total: {times.sum():.2f}s ({times.sum()/60:.1f} min)")

ANALYSIS SUMMARY

1. PROCESSING STATISTICS:
   Total posts: 1000
   Successful: 0 (0.0%)
   Failed: 1000

2. RELEVANCE DISTRIBUTION:

3. SCORE DISTRIBUTION (relevant posts only):
   No scores available

4. PROCESSING TIME:
   Mean: 0.01s
   Min: 0.00s
   Max: 0.08s
   Total: 10.87s (0.2 min)


In [None]:
# Show sample of relevant posts with high scores
print("\nSAMPLE OF RELEVANT POSTS (sorted by score):")
print("="*60)

relevant_posts = df_results[
    (df_results['relevant'] == True) & 
    (df_results['score'].notna())
].sort_values('score', ascending=False)

for i, row in relevant_posts.head(10).iterrows():
    print(f"\nID: {row['id_mention']}")
    print(f"Score: {row['score']}")
    text = row['full_text'][:150] + "..." if len(str(row['full_text'])) > 150 else row['full_text']
    print(f"Text: {text}")
    justification = str(row['justification'])[:200] + "..." if len(str(row['justification'])) > 200 else row['justification']
    print(f"Justification: {justification}")
    print("-"*40)


SAMPLE OF RELEVANT POSTS (sorted by score):


In [None]:
# Show errors if any
failed = df_results[df_results['success'] == False]
if len(failed) > 0:
    print(f"\nFAILED POSTS ({len(failed)} total):")
    print("="*60)
    for i, row in failed.head(10).iterrows():
        print(f"ID: {row['id_mention']}")
        print(f"Error: {row.get('error', 'Unknown')}")
        print("-"*40)
else:
    print("\nNo failed posts!")


FAILED POSTS (1000 total):
ID: 00018e4f326728431c1c97b775f31b67
Error: Initializing ChatOllama requires the langchain-ollama package. Please install it with `pip install langchain-ollama`
----------------------------------------
ID: 0001c9de9cf636d2b5faa31eb88a87cd
Error: Initializing ChatOllama requires the langchain-ollama package. Please install it with `pip install langchain-ollama`
----------------------------------------
ID: 00014e37ef658664f02ab02c991d45f7
Error: Initializing ChatOllama requires the langchain-ollama package. Please install it with `pip install langchain-ollama`
----------------------------------------
ID: 00006b826eb2064cd4c069df5e9bebac
Error: Initializing ChatOllama requires the langchain-ollama package. Please install it with `pip install langchain-ollama`
----------------------------------------
ID: 0001df78f4f1323c23cab956a749ac62
Error: Initializing ChatOllama requires the langchain-ollama package. Please install it with `pip install langchain-ollama`
---

## Export Final Consolidated JSON (Optional)

In [None]:
# Optionally export a single consolidated JSON file at the end
consolidated_json = f"output/analysis_results_{RUN_ID}_final.json"
with open(consolidated_json, "w", encoding="utf-8") as f:
    json.dump(all_results, f, ensure_ascii=False, indent=2)
print(f"Consolidated JSON saved to: {consolidated_json}")

Consolidated JSON saved to: output/analysis_results_20260131_143027_final.json
