In [36]:
import os
import csv
from typing import List, Dict, Optional
from pathlib import Path
from pydantic import BaseModel, Field
from tqdm.auto import tqdm

from langchain_openai import ChatOpenAI
from langchain_core.prompts import PromptTemplate
from langchain_community.document_loaders import TextLoader, DirectoryLoader
from langchain_core.output_parsers import PydanticOutputParser
from langchain_core.runnables import RunnablePassthrough

In [42]:
# Define the output model using Pydantic v2
class MICClassificationResult(BaseModel):
    """Schema for the Militarized Interstate Confrontation classification result."""
    is_mic: bool = Field(description="True if the article describes a Militarized Interstate Confrontation (MIC), False otherwise")
    explanation: str = Field(description="A brief explanation of why the article was classified as MIC or not")

def init_llm() -> ChatOpenAI:
    """Initialize and return the LLM model using LangChain 0.3."""
    return ChatOpenAI(
        base_url="http://localhost:1234/v1",
        api_key="LMStudio",
        model_name="qwen2.5-7b-instruct-1m",
        temperature=0.1
    )

def create_classification_chain():
    """Create a LangChain classification chain"""
    # Initialize components
    llm = init_llm()
    parser = PydanticOutputParser(pydantic_object=MICClassificationResult)
    format_instructions = parser.get_format_instructions()
    
    prompt = PromptTemplate(
        template="""You are an expert analyst of international relations and military conflicts.
        Your task is to determine whether a news article describes a Militarized Interstate Confrontation (MIC).

        A Militarized Interstate Confrontation (MIC) is defined as:
        - A direct confrontation between two or more countries
        - Involving military forces (army, navy, air force, etc.)
        - Where there is a threat, display, or use of military force

        The article must describe an actual military interaction, not just diplomatic tensions or discussions about potential conflicts.

        Analyze the following article carefully and determine if it describes a MIC.
        Output your answer in the specified JSON format with two fields:
        1. is_mic: true if it's a MIC, false if it's not
        2. explanation: A short explanation of your reasoning

        Here are some examples to guide you:
        Example 1:
        Article: Russian troops opened fire on Ukrainian soldiers near the border, killing three and wounding seven others. The Ukrainian government condemned the attack as a violation of its sovereignty.
        Output: {{"is_mic": true, "explanation": "This article describes a direct military confrontation between Russian and Ukrainian forces with fatalities, which is a clear case of a Militarized Interstate Confrontation."}}

        Example 2:
        Article: China and Taiwan held diplomatic talks aimed at easing tensions in the region. Both sides agreed to maintain open lines of communication to prevent misunderstandings.
        Output: {{"is_mic": false, "explanation": "This article describes diplomatic talks rather than a military confrontation. No military forces were involved, and there was no threat or use of force."}}

        Example 3:
        Article: North Korean forces fired artillery shells into South Korean waters as a show of force during joint US-South Korean military exercises. No casualties were reported.
        Output: {{"is_mic": true, "explanation": "This article describes a militarized action (artillery fire) by North Korea directed at South Korea, which constitutes a Militarized Interstate Confrontation even without casualties."}}

        Example 4:
        Article: The United Nations Security Council met to discuss increasing tensions between India and Pakistan but no military actions were reported.
        Output: {{"is_mic": false, "explanation": "This article only mentions diplomatic discussions about tensions. It does not describe any actual military confrontation, threat, or use of force between countries."}}

        Article:
        {article}

        {format_instructions}
        """,
        input_variables=['article'],
        partial_variables={'format_instructions': format_instructions}
    )
    
    # Build the LCEL chain
    chain = prompt | llm | parser
    
    return chain

def classify_article(chain, article_text: str) -> MICClassificationResult:
    """Classify an article as MIC or non-MIC using the given chain."""
    try:
        return chain.invoke({"article": article_text})
    except Exception as e:
        # Return a default response if parsing fails
        return MICClassificationResult(is_mic=False, explanation=f"Error in classification: {str(e)}")

def read_article_file(file_path: Path) -> Optional[str]:
    """Read an article file with proper error handling for encodings."""
    try:
        # Try UTF-8 first
        return file_path.read_text(encoding="utf-8")
    except UnicodeDecodeError:
        try:
            # Fall back to Latin-1
            return file_path.read_text(encoding="latin-1")
        except Exception as e:
            print(f"Error reading {file_path.name}: {e}")
            return None

def process_directory(base_dir: Path, output_dir: Path, years_to_process: list[str]) -> None:
    """Process all articles in directories and save results to CSV files."""
    # Create output directory if it doesn't exist
    output_dir.mkdir(parents=True, exist_ok=True)
    
    # Initialize the classification chain
    chain = create_classification_chain()
    
    # Get year folders
    year_folders = [f for f in base_dir.iterdir() if f.is_dir() and f.name in years_to_process]
    print(f"Processing {len(year_folders)} specified year folders: {years_to_process}")
    
    # Process each year folder with tqdm
    for year_folder in tqdm(year_folders, desc="Processing years", position=0):
        year_name = year_folder.name
        csv_file = output_dir / f"{year_name}_classification.csv"
        
        # Get all article files and sort them by name
        article_files = sorted(list(year_folder.glob("**/*.txt")), key=lambda x: x.name)
        total_articles = len(article_files)
        print(f"Found {total_articles} articles in {year_name}")
        
        # Check if CSV exists and load existing entries to avoid duplicates
        existing_entries = set()
        if csv_file.exists():
            with open(csv_file, mode="r", newline="", encoding="utf-8") as file:
                reader = csv.reader(file)
                next(reader)  # Skip header
                for row in reader:
                    if row:  # Skip empty rows
                        existing_entries.add(row[0])  # Add index to set
            print(f"Found {len(existing_entries)} existing entries in CSV")
        
        # Open CSV in append mode if it exists, or write mode if it doesn't
        file_mode = "a" if csv_file.exists() else "w"
        with open(csv_file, mode=file_mode, newline="", encoding="utf-8") as file:
            writer = csv.writer(file)
            
            # Write header only if creating a new file
            if file_mode == "w":
                writer.writerow(["Index", "Label", "Explanation"])
            
            # Process articles with a nested progress bar
            with tqdm(total=total_articles, desc=f"Articles in {year_name}", position=1, leave=False) as pbar:
                
                processed = 0
                
                # Process files in sorted order
                for article_file in article_files:
                    # Create index for this file
                    file_index = f"{year_name}_{article_file.name}"
                    
                    # Skip if already processed
                    if file_index in existing_entries:
                        pbar.update(1)
                        continue
                    
                    # Read the article
                    content = read_article_file(article_file)
                    if content is None:
                        pbar.update(1)
                        continue
                    
                    # Classify article
                    result = classify_article(chain, content)
                    writer.writerow([
                        file_index,                         # Index
                        int(result.is_mic),                 # Label (1 for True, 0 for False)
                        result.explanation                  # Explanation
                    ])
                    
                    processed += 1
                    
                    pbar.update(1)  # Update progress bar after each article
        
        print(f"Completed processing for year {year_name}. Added {processed} new entries to {csv_file}")

In [None]:
base_dir = Path.cwd().parent / "processed_files"
output_dir = Path.cwd().parent / "classified_files"

print(f"Starting classification of articles from {base_dir}")
process_directory(base_dir, output_dir, ["2008"])
print("Classification and saving complete.")