# Facebook Post Stance Classifier

Classify Facebook posts using the trained Ministral-8B stance classifier.

This notebook loads the trained model from `best_adapter/` and applies it to a CSV file containing Facebook posts. It uses the same 3-class system:
- **Pro-Palestinian**
- **Pro-Israeli**  
- **Neutral** (merged from Other, Off-topic, Anti-War_Pro-Peace)


In [None]:
from __future__ import annotations

import os
import json
import logging
import math
import re
from pathlib import Path
from typing import List, Sequence
from concurrent.futures import ProcessPoolExecutor


import warnings
import pandas as pd
import torch
from tqdm.auto import tqdm
from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    AutoConfig,
    BitsAndBytesConfig,
)
from peft import PeftModel, PeftConfig

from google.colab import drive
drive.mount('/content/drive')

# Optional: for language filtering
try:
    from langdetect import detect, LangDetectException
    HAS_LANGDETECT = True
except ImportError:
    HAS_LANGDETECT = False
    logging.warning("langdetect not installed. Language filtering disabled.")

warnings.filterwarnings("ignore", category=pd.errors.DtypeWarning)


In [None]:
# Install required packages
!pip install -q transformers accelerate peft langdetect tqdm pandas huggingface_hub

# Install bitsandbytes (required for 4-bit quantization)
print("Installing bitsandbytes...")
!pip install -q bitsandbytes
print("✓ bitsandbytes installed")

# Install flash-attn (optional but recommended for faster inference)
# Note: This may take a few minutes to compile
print("\nInstalling flash-attn (this may take a few minutes)...")
try:
    !pip install -q flash-attn --no-build-isolation
    print("✓ flash-attn installed")
except Exception as e:
    print(f"⚠ flash-attn installation failed (will use default attention): {e}")
    print("This is optional - the model will still work with default attention.")

# Verify installations
try:
    import bitsandbytes
    print("✓ bitsandbytes imported successfully")
except Exception as e:
    print(f"⚠ Warning: bitsandbytes import failed: {e}")
    print("You may need to restart the runtime after installation.")

try:
    import flash_attn
    print("✓ flash-attn imported successfully")
except ImportError:
    print("⚠ flash-attn not available (will use default attention)")

print("\n✓ All dependencies installed!")
print("\n⚠️ NEXT STEP: Restart runtime (Runtime → Restart runtime), then run all cells from the beginning.")


## Hugging Face Authentication

Set your Hugging Face token to access gated models or private repositories:


In [None]:
# Option 1: Enter your token directly (not recommended for sharing - token will be visible)
# HF_TOKEN = "your_huggingface_token_here"

# Option 2: Use getpass to securely input token (recommended)
from getpass import getpass
HF_TOKEN = getpass("Enter your Hugging Face token: ")

# Option 3: Read from environment variable (if already set)
# HF_TOKEN = os.environ.get("HF_TOKEN")

# Set the token for Hugging Face Hub
if HF_TOKEN:
    from huggingface_hub import login
    login(token=HF_TOKEN)
    print("✓ Hugging Face authentication successful!")
else:
    print("⚠ No Hugging Face token provided. This may fail if accessing gated/private models.")


## Configuration

Set your paths and parameters here:


In [None]:
# Model and data paths
# Update these paths to point to your files in Google Drive
MODEL_DIR = "/content/drive/MyDrive/UTJ2/Mémoire/Mémoire Scripts/Stance/best_adapter"  # Path to trained model directory (update this!)
TRAIN_CSV = "/content/drive/MyDrive/UTJ2/Mémoire/Mémoire Scripts/Stance/gaza_stance_sampled_classified.csv"  # For label mapping fallback (update this!)

# Input/Output paths
INPUT_CSV = "/content/drive/MyDrive/UTJ2/Mémoire/Data/Final data to use/finaldataSample20251127.csv"  # Set your input CSV path here (update this!)
OUTPUT_CSV = None  # Will be auto-generated if None (input_name_classified.csv)

# Processing settings
BATCH_SIZE = 32
BATCH_SAVE_EVERY = 100  # Save every N batches
MAX_LENGTH = 512
SEP = " - "
CONSTRUCTED_COL = "constructed_text"
PRED_COL = "predicted_category"

# Language filtering (optional)
FILTER_ENGLISH = True
TARGET_LANG = "en"
NUM_WORDS_SAMPLE = 100
NUM_WORKERS = None  # None = use all CPU cores

# Setup logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)-8s | %(message)s"
)
log = logging.getLogger("stance_classifier")


## Helper Functions


In [None]:
def strip_invisible(text: str) -> str:
    """Remove zero-width characters."""
    zero_width_re = re.compile(r"[\u200B-\u200F\u202A-\u202E\u2060-\u206F\uFEFF]")
    return zero_width_re.sub("", text)


def concatenate_fields(values: Sequence[str | float | None], *, sep: str = SEP) -> str:
    """Concatenate text fields, avoiding duplicates."""
    parts: List[str] = []
    for val in values:
        if not isinstance(val, str):
            continue
        val_clean = val.strip()
        if not val_clean:
            continue
        current = sep.join(parts).lower()
        if val_clean.lower() in current:
            continue
        parts.append(val_clean)
    return sep.join(parts)


def process_facebook(df: pd.DataFrame) -> pd.DataFrame:
    """Reconstruct full text from Facebook CSV columns."""
    text_cols = ["Message", "Description", "Image Text", "Link Text"]
    df[CONSTRUCTED_COL] = df.apply(
        lambda row: concatenate_fields([row.get(c) for c in text_cols]), axis=1
    )
    return df


def safe_read_csv(path: str | Path) -> pd.DataFrame:
    """Robust CSV reader with fallback."""
    try:
        return pd.read_csv(path, low_memory=False)
    except pd.errors.ParserError as err:
        log.warning(f"Standard parser failed for {path}. Retrying with engine='python'...")
        return pd.read_csv(path, engine="python", on_bad_lines="skip")


## Language Filtering (Optional)


In [None]:
def _detect_lang_worker(args):
    """Worker for parallel language detection."""
    idx, txt = args
    if not HAS_LANGDETECT:
        return idx, True  # Skip filtering if langdetect not available
    words = txt.split()
    sample = " ".join(words[:NUM_WORDS_SAMPLE])
    if not sample.strip():
        return idx, False
    try:
        return idx, detect(sample) == TARGET_LANG
    except (LangDetectException, Exception):
        return idx, False


def filter_english(texts: list[str], *, workers: int | None = NUM_WORKERS) -> list[bool]:
    """Filter texts to keep only English ones."""
    if not HAS_LANGDETECT:
        log.warning("langdetect not available. Skipping language filtering.")
        return [True] * len(texts)
    
    workers = workers or os.cpu_count() or 4
    log.info(f"Detecting language on {len(texts)} texts with {workers} workers...")
    
    flags = [False] * len(texts)
    with ProcessPoolExecutor(max_workers=workers) as ex:
        for idx, ok in tqdm(
            ex.map(_detect_lang_worker, enumerate(texts), chunksize=512),
            total=len(texts),
            desc="Lang-detect",
            unit="post"
        ):
            flags[idx] = ok
    return flags


## Model Loading


In [None]:
def load_classifier(model_dir: str):
    """
    Load the trained 2-class stance classifier (Pro-Palestinian vs Pro-Israeli).
    
    The adapter was trained with 2 classes:
    - 0: Pro-Palestinian
    - 1: Pro-Israeli
    """
    log.info(f"Loading 2-class classifier from {model_dir}...")
    
    # Convert to Path and ensure absolute path for local files
    model_dir_path = Path(model_dir).absolute()
    model_dir_str = str(model_dir_path)
    
    # Load PEFT config by reading adapter_config.json directly
    adapter_config_path = model_dir_path / "adapter_config.json"
    if not adapter_config_path.exists():
        raise FileNotFoundError(f"adapter_config.json not found in {model_dir_path}")
    
    with open(adapter_config_path) as f:
        adapter_config = json.load(f)
    
    base_name = adapter_config.get("base_model_name_or_path")
    if not base_name:
        raise ValueError("base_model_name_or_path not found in adapter_config.json")
    
    # Hard-code 2-class system based on the trained adapter
    # The adapter has shape [2, 4096] which confirms 2 classes
    id2label = {
        0: "Pro-Palestinian",
        1: "Pro-Israeli",
    }
    label2id = {v: k for k, v in id2label.items()}
    num_labels = 2
    
    log.info(f"Using 2-class mapping: {id2label}")
    log.info(f"Base model: {base_name}")
    
    # Load config - only pass num_labels and id2label if they're valid
    cfg = AutoConfig.from_pretrained(
        base_name,
        num_labels=num_labels,
        id2label=id2label,
        label2id=label2id,
    )
    
    # Setup quantization (same as training)
    bnb_cfg = BitsAndBytesConfig(
        load_in_4bit=True,
        bnb_4bit_quant_type="nf4",
        bnb_4bit_use_double_quant=True,
        bnb_4bit_compute_dtype=(
            torch.bfloat16 if torch.cuda.is_bf16_supported() else torch.float16
        ),
    )
    
    # Determine attention implementation (use flash_attention_2 if available)
    try:
        import flash_attn
        attn_impl = "flash_attention_2"
        log.info("Using Flash Attention 2 for faster inference")
    except ImportError:
        attn_impl = None  # Use default attention
        log.info("Flash Attention 2 not available, using default attention")
    
    # Load base model
    log.info(f"Loading base model: {base_name}")
    model_kwargs = {
        "config": cfg,
        "device_map": "auto",
        "quantization_config": bnb_cfg,
    }
    if attn_impl:
        model_kwargs["attn_implementation"] = attn_impl
    
    base = AutoModelForSequenceClassification.from_pretrained(
        base_name,
        **model_kwargs
    )
    
    # Load tokenizer from base model (NOT from adapter directory)
    # PEFT/LoRA adapters don't contain tokenizers - always use base model tokenizer
    log.info(f"Loading tokenizer from base model: {base_name}")
    tok = AutoTokenizer.from_pretrained(base_name, padding_side="left")
    if tok.pad_token_id is None:
        tok.add_special_tokens({'pad_token': '<pad>'})
        tok.pad_token = '<pad>'
        base.resize_token_embeddings(len(tok))
    base.config.pad_token_id = tok.pad_token_id
    
    # Load PEFT adapter from local directory
    model_dir_str = str(model_dir_path.absolute())

    # If this path is wrong / not mounted, PEFT will treat it like a Hub repo id and crash.
    if not model_dir_path.exists():
        raise FileNotFoundError(
            f"Adapter folder does not exist: {model_dir_str}\n"
            "Make sure Drive is mounted and MODEL_DIR points to the adapter directory."
        )
    if not (model_dir_path / "adapter_config.json").exists():
        raise FileNotFoundError(
            f"adapter_config.json not found in: {model_dir_str}\n"
            "MODEL_DIR must point to the PEFT adapter folder."
        )
    if not (
        (model_dir_path / "adapter_model.safetensors").exists()
        or (model_dir_path / "adapter_model.bin").exists()
        or (model_dir_path / "adapter_model.pt").exists()
    ):
        log.warning(
            "No adapter_model.(safetensors|bin|pt) found in %s. "
            "If your adapter files are named differently, PEFT may not load them.",
            model_dir_str,
        )

    log.info(f"Loading PEFT adapter from local path: {model_dir_str}")
    try:
        model = PeftModel.from_pretrained(base, model_dir_str, local_files_only=True)
    except RuntimeError as e:
        log.warning(f"LoRA head incompatible ({e}) → using ignore_mismatched_sizes=True")
        model = PeftModel.from_pretrained(
            base,
            model_dir_str,
            ignore_mismatched_sizes=True,
            local_files_only=True,
        )
    
    model.eval()
    model.config.pad_token_id = tok.pad_token_id
    
    # Build prompt function (same as training)
    cats_str = ", ".join(id2label.values())
    def build_prompt(txt: str) -> str:
        messages = [
            {
                "role": "system",
                "content": (
                    "You are an expert assistant. "
                    "Classify the following text into one of these "
                    f"categories: {cats_str}. "
                    "Respond with the category label only."
                ),
            },
            {"role": "user", "content": txt},
        ]
        return tok.apply_chat_template(
            messages, tokenize=False, add_generation_prompt=False
        ).strip()
    
    return tok, model, build_prompt, id2label


In [None]:
def incremental_predict(
    df: pd.DataFrame,
    tok,
    model,
    build_prompt,
    id2label: dict,
    *,
    text_col: str = CONSTRUCTED_COL,
    batch_size: int = BATCH_SIZE,
    save_every: int = BATCH_SAVE_EVERY,
    out_path: str | Path | None = None,
) -> None:
    """
    Predict categories for texts with 2-class scores (Pro-Palestinian and Pro-Israeli).
    Saves both predicted category and individual class scores.
    """
    import torch.nn.functional as F
    
    device = next(model.parameters()).device
    
    # Find rows to process
    to_process = df.index[df[PRED_COL].isna() | (df[PRED_COL] == "")].tolist()
    if not to_process:
        log.info("No rows to categorize (already complete).")
        return
    
    total_batches = math.ceil(len(to_process) / batch_size)
    batch_counter = 0
    
    # Ensure score columns exist
    if "score_pro_palestinian" not in df.columns:
        df["score_pro_palestinian"] = pd.NA
    if "score_pro_israeli" not in df.columns:
        df["score_pro_israeli"] = pd.NA
    
    log.info(f"Processing {len(to_process)} rows in {total_batches} batches...")
    
    for i in tqdm(
        range(0, len(to_process), batch_size),
        desc="Batch-predict",
        total=total_batches,
        unit="batch",
    ):
        batch_idx = to_process[i : i + batch_size]
        batch_texts = df.loc[batch_idx, text_col].apply(strip_invisible).tolist()
        prompts = [build_prompt(t) for t in batch_texts]
        
        # Tokenize
        enc = tok(
            prompts,
            return_tensors="pt",
            padding=True,
            truncation=True,
            max_length=MAX_LENGTH,
            add_special_tokens=False,  # Important: same as training
        )
        enc = {k: v.to(device) for k, v in enc.items()}
        
        # Predict
        with torch.no_grad():
            logits = model(**enc).logits
            # Convert logits to probabilities using softmax
            probs = F.softmax(logits, dim=-1).cpu().numpy()
        
        # Get predicted class (argmax)
        ids = probs.argmax(axis=1).tolist()
        labels = [id2label[i] for i in ids]
        df.loc[batch_idx, PRED_COL] = labels
        
        # Store individual class scores
        df.loc[batch_idx, "score_pro_palestinian"] = probs[:, 0]  # Class 0
        df.loc[batch_idx, "score_pro_israeli"] = probs[:, 1]  # Class 1
        
        batch_counter += 1
        if out_path and batch_counter % save_every == 0:
            log.info(f"Interim save → {out_path}")
            df.to_csv(out_path, index=False)
    
    if out_path:
        log.info(f"Final save → {out_path}")
        df.to_csv(out_path, index=False)


## Main Classification Pipeline

Run the classification by executing the cells below:


In [None]:
# Prepare paths
input_csv = Path(INPUT_CSV)
if not input_csv.exists():
    raise FileNotFoundError(f"Input CSV not found: {input_csv}")

if OUTPUT_CSV is None:
    output_csv = input_csv.parent / f"{input_csv.stem}_classified{input_csv.suffix}"
else:
    output_csv = Path(OUTPUT_CSV)

print(f"Input CSV: {input_csv}")
print(f"Output CSV: {output_csv}")


In [None]:
# Load or create dataframe
if output_csv.exists():
    log.info(f"Found existing output ({output_csv}) – resume mode.")
    df = safe_read_csv(output_csv)
    if CONSTRUCTED_COL not in df.columns:
        raw = safe_read_csv(input_csv)
        df_texts = process_facebook(raw)[[CONSTRUCTED_COL]]
        df = df.join(df_texts)
else:
    log.info(f"Loading raw CSV: {input_csv}")
    df = process_facebook(safe_read_csv(input_csv))
    df[PRED_COL] = pd.NA

print(f"DataFrame shape: {df.shape}")
print(f"Columns: {list(df.columns)}")


In [None]:
# Language filtering (if enabled)
if FILTER_ENGLISH:
    mask_uncat = df[PRED_COL].isna() | (df[PRED_COL] == "")
    to_check = df.loc[mask_uncat, CONSTRUCTED_COL].tolist()
    if to_check:
        log.info(f"Language filtering ({len(to_check)} rows)...")
        flags = filter_english(to_check)
        df = df.loc[
            ~mask_uncat | pd.Series(flags, index=df.loc[mask_uncat].index)
        ].reset_index(drop=True)
        print(f"After language filtering: {len(df)} rows")


In [None]:
# Load model and tokenizer
tok, model, build_prompt, id2label = load_classifier(MODEL_DIR)


In [None]:
# Run predictions
incremental_predict(
    df,
    tok,
    model,
    build_prompt,
    id2label,
    batch_size=BATCH_SIZE,
    save_every=BATCH_SAVE_EVERY,
    out_path=output_csv,
)


In [None]:
# Display results summary
log.info(f"Completed! Results saved to {output_csv}")
log.info(f"Total rows: {len(df)}")

if PRED_COL in df.columns:
    counts = df[PRED_COL].value_counts()
    log.info("Classification summary:")
    for cat, count in counts.items():
        log.info(f"  {cat}: {count} ({count/len(df)*100:.1f}%)")
    
    # Also display as a nice table
    print("\n" + "="*50)
    print("Classification Summary")
    print("="*50)
    display(counts.to_frame("Count").assign(Percentage=lambda x: (x['Count'] / len(df) * 100).round(1)))
