In [1]:
#Configuration and dataset selection

import sys
import math
import re
from typing import Dict, List, Tuple

import numpy as np
import pandas as pd

import spacy
from spacy.language import Language
from spacy.tokens import Doc

import nltk
from nltk.corpus import cmudict

from g2p_en import G2p


# Reproducibility
RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)

# Paths to the three samples
SAMPLE_PATHS = {
    "small":  "raid_sample_small.csv",
    "medium": "raid_sample_medium.csv",
    "large":  "raid_sample_large.csv",
}

# >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
# Set this to one of: "small", "medium", "large"
SELECTED_DATASET = "large"
# <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<

TEXT_COL = "generation"

df = pd.read_csv(SAMPLE_PATHS[SELECTED_DATASET])
print(f"Loaded {SELECTED_DATASET} dataset with {len(df)} rows.")
print(f"df_columns: {df.columns}")
df.head(3)


Loaded large dataset with 60000 rows.
df_columns: Index(['id', 'adv_source_id', 'source_id', 'model', 'decoding',
       'repetition_penalty', 'attack', 'domain', 'title', 'prompt',
       'generation', 'is_ai', 'source_type', 'generation_raw', 'had_urls',
       'had_html', 'had_code', 'had_table', 'n_chars', 'n_tok', 'alpha_ratio',
       'digit_ratio', 'punct_ratio', 'avg_word_length', 'std_word_length',
       'entropy_bits', 'entropy_norm', 'is_text_like', 'not_text_reason',
       'n_tokens_ws', 'length_bin'],
      dtype='object')


Unnamed: 0,id,adv_source_id,source_id,model,decoding,repetition_penalty,attack,domain,title,prompt,...,digit_ratio,punct_ratio,avg_word_length,std_word_length,entropy_bits,entropy_norm,is_text_like,not_text_reason,n_tokens_ws,length_bin
0,8a274811-4809-4a88-96a2-9af63c06bae9,e4f35f18-1a33-4cb3-bf63-d4402ded0dbc,79c87b70-fb77-4a80-881a-2a77db2b17bc,mistral,greedy,no,zero_width_space,books,Each Man's Son,The following is the full text of a plot summa...,...,0.023848,0.03523,4.400621,2.422283,4.525679,0.782805,True,,322,long
1,3273ddc0-2211-4116-90bb-1ce99bf01e6c,2d7d0873-d4f7-47d1-b971-cbdaff88d5bc,2d7d0873-d4f7-47d1-b971-cbdaff88d5bc,human,,,article_deletion,recipes,Salad of Roasted Beets and Arugula with Blue C...,,...,0.015239,0.034833,4.523404,2.000661,4.417504,0.78271,True,,248,long
2,46c83315-fb96-4c22-91ac-b77e7edb42bc,4f1cf565-8c27-4c20-ac22-824146821c03,143a1df5-a01a-476d-a3fb-c59f2ec78fff,gpt3,greedy,no,whitespace,recipes,Guiltfree Chocolate Cheesecake,The following is the full text of a recipe for...,...,0.045175,0.055441,5.0,2.089545,4.555896,0.824811,True,,84,short


In [2]:
# Initialize spaCy, CMUdict (NLTK), and g2p_en fallback

# Ensure CMUdict is available
nltk.download('cmudict', quiet=True)
nltk.download('averaged_perceptron_tagger_eng')
CMU = cmudict.dict()  # key: lowercase word, value: list of pronunciations (list of ARPAbet tokens)

# Load spaCy English model (use 'en_core_web_sm' unless you already have md/lg installed)
try:
    nlp: Language = spacy.load("en_core_web_lg")
except OSError:
    from spacy.cli import download
    download("en_core_web_sm")
    nlp = spacy.load("en_core_web_sm")

# Ensure sentence boundaries are available (parser usually handles this; add sentencizer if needed)
if "sentencizer" not in nlp.pipe_names and "parser" not in nlp.pipe_names:
    nlp.add_pipe("sentencizer")

# g2p_en for OOV coverage
G2P = G2p()

# ARPAbet vowel bases (stress digits removed when checking)
ARPA_VOWELS = {
    "AA", "AE", "AH", "AO", "AW", "AY",
    "EH", "ER", "EY",
    "IH", "IY",
    "OW", "OY",
    "UH", "UW"
}

# Cache syllable counts for speed
_SYLL_CACHE: Dict[str, int] = {}

def cmu_syllables(word: str) -> int | None:
    """
    Returns syllable count using CMUdict if available; else None.
    Policy: use the first pronunciation variant.
    """
    w = word.lower()
    if w not in CMU:
        return None
    phones = CMU[w][0]
    count = 0
    for ph in phones:
        base = re.sub(r"\d", "", ph)
        if base in ARPA_VOWELS:
            count += 1
    return max(count, 1)  # at least one for non-empty alphabetic words

def g2p_syllables(word: str) -> int:
    """
    Returns syllable count using neural g2p_en; counts vowel phonemes.
    """
    w = word.lower()
    if w in _SYLL_CACHE:
        return _SYLL_CACHE[w]
    phones = G2P(w)
    count = 0
    for ph in phones:
        base = re.sub(r"\d", "", ph)
        if base in ARPA_VOWELS:
            count += 1
    # Guard: ensure >=1 for alphabetic tokens
    if count == 0 and re.search(r"[A-Za-z]", w):
        count = 1
    _SYLL_CACHE[w] = count
    return count

def syllables_hybrid(word: str) -> int:
    """
    Hybrid policy: try CMUdict first; if OOV, fall back to g2p_en.
    """
    c = cmu_syllables(word)
    if c is not None:
        return c
    return g2p_syllables(word)


[nltk_data] Downloading package averaged_perceptron_tagger_eng to
[nltk_data]     C:\Users\marco\AppData\Roaming\nltk_data...
[nltk_data]   Package averaged_perceptron_tagger_eng is already up-to-
[nltk_data]       date!


In [3]:
# Feature computation utilities using spaCy + CMUdict with g2p_en fallback

import zlib

# permissive fallback segmentation for pathological cases
_FALLBACK_SPLIT = re.compile(r'(?<=[\.!?])\s+|[\r\n]+|(?<=;)\s+|(?<=:)\s+')

def resegmentize_if_needed(text: str, nlp: Language, asl_hi: int = 100, min_tokens: int = 120, doc=None):
    """Return fallback sentence strings when avg sentence length is extreme."""
    doc = doc or nlp(text)
    sents = list(doc.sents)
    n_sent = len(sents)
    n_tok = len(doc)
    avg_len = (n_tok / max(n_sent, 1)) if n_tok else 0

    if (n_tok >= min_tokens) and (n_sent <= 2 or avg_len >= asl_hi):
        parts = [s.strip() for s in _FALLBACK_SPLIT.split(text) if s and not s.isspace()]
        return parts
    return None

def prepare_doc(text: str, nlp: Language, skip_reseg: bool = False) -> Tuple[Doc, bool]:
    """
    Parse text with spaCy and optionally resegment when heuristics trigger.
    Set skip_reseg=True when you need accurate dependency trees.
    """
    primary_doc = nlp(text)
    
    if skip_reseg:
        return primary_doc, False
    
    fallback = resegmentize_if_needed(text, nlp, doc=primary_doc)
    if fallback is None:
        return primary_doc, False
    
    fixed_text = ". ".join(fallback)
    return nlp(fixed_text), True

def can_compute_readability(n_tokens: int, n_sents: int) -> bool:
    return (n_tokens >= 100) and (n_sents >= 3)

def safe_readability(tokens: int, sentences: int, syllable_counts: List[int], chars_per_word: float, complex_words: int, polysyllables: int) -> Dict[str, float]:
    """Safely compute readability metrics, falling back to NaN when unstable."""
    out = {
        "flesch_reading_ease": np.nan,
        "gunning_fog": np.nan,
        "smog_index": np.nan,
        "automated_readability_index": np.nan,
    }
    if not can_compute_readability(tokens, sentences):
        return out

    words = max(tokens, 1)
    sents = max(sentences, 1)
    syllables = max(int(np.sum(syllable_counts)), 1)
    chars_per_word = float(chars_per_word) if chars_per_word else 0.0
    complex_words = max(complex_words, 0)
    polysyllables = max(polysyllables, 0)

    out["flesch_reading_ease"] = 206.835 - 1.015 * (words / sents) - 84.6 * (syllables / words)
    out["gunning_fog"] = 0.4 * ((words / sents) + 100.0 * (complex_words / words))
    out["smog_index"] = (1.043 * math.sqrt(30.0 * (polysyllables / sents)) + 3.1291) if polysyllables > 0 else np.nan
    out["automated_readability_index"] = 4.71 * chars_per_word + 0.5 * (words / sents) - 21.43
    return out

def _word_like(tok) -> bool:
    """Select lexical tokens (alphabetic, not space)."""
    return tok.is_alpha and not tok.is_space

def _alnum_char_count(token_text: str) -> int:
    """Count alphanumeric characters for ARI; excludes whitespace and punctuation."""
    return sum(ch.isalnum() for ch in token_text)

def features_from_doc(doc: Doc, text: str, *, resegmented: bool = False) -> Dict[str, float]:
    """Compute core lexical and readability-driven metrics with guards."""
    if doc.has_annotation("SENT_START"):
        sents = list(doc.sents)
    else:
        sents = list(doc.sents) if hasattr(doc, "sents") else [doc]
    if not sents:
        sents = [doc]
    n_sents = len(sents)

    word_toks = [t for t in doc if _word_like(t)]
    punct_toks = [t for t in doc if t.is_punct]
    nonspace_toks = [t for t in doc if not t.is_space]

    n_tokens = len(word_toks)
    sent_word_counts = [sum(1 for t in sent if _word_like(t)) for sent in sents]
    avg_sentence_length = float(np.mean(sent_word_counts)) if sent_word_counts else np.nan
    sentence_length_std = float(np.std(sent_word_counts, ddof=0)) if len(sent_word_counts) > 1 else np.nan

    word_lengths = [len(t.text) for t in word_toks] 
    avg_word_length = float(np.mean(word_lengths)) if word_lengths else np.nan

    vocab = {t.text.lower() for t in word_toks}
    type_token_ratio = (len(vocab) / n_tokens) if n_tokens > 0 else np.nan

    stop_count = sum(1 for t in word_toks if t.is_stop)
    stopword_ratio = (stop_count / n_tokens) if n_tokens > 0 else np.nan

    chars_alnum = sum(_alnum_char_count(t.text) for t in nonspace_toks)
    punct_chars = sum(len(t.text) for t in punct_toks)
    nonspace_chars = sum(len(t.text) for t in nonspace_toks)
    punctuation_ratio = (punct_chars / nonspace_chars) if nonspace_chars > 0 else np.nan

    syllable_counts = [syllables_hybrid(t.text) for t in word_toks]
    polysyllables = sum(1 for syl in syllable_counts if syl >= 3)
    complex_words = polysyllables
    chars_per_word = (chars_alnum / n_tokens) if n_tokens > 0 else 0.0

    readability = safe_readability(
        tokens=n_tokens,
        sentences=n_sents,
        syllable_counts=syllable_counts,
        chars_per_word=chars_per_word,
        complex_words=complex_words,
        polysyllables=polysyllables,
    )

    features = {
        "avg_word_length": avg_word_length,
        "type_token_ratio": type_token_ratio,
        "stopword_ratio": stopword_ratio,
        "punctuation_ratio": punctuation_ratio,
        "avg_sentence_length": avg_sentence_length,
        "sentence_length_std": sentence_length_std,
        "n_tokens_doc": float(n_tokens),
        "n_sentences_doc": float(n_sents),
        "resegmented": bool(resegmented),
    }
    features.update(readability)
    return features


In [4]:
# N-gram feature extraction utilities

from collections import Counter
from typing import List, Tuple
import math

def extract_ngrams(tokens: List[str], n: int) -> List[Tuple[str, ...]]:
    """Extract n-grams from a list of tokens."""
    if len(tokens) < n:
        return []
    return [tuple(tokens[i:i+n]) for i in range(len(tokens) - n + 1)]

def ngram_diversity(tokens: List[str], n: int) -> float:
    """
    Calculate n-gram diversity (unique n-grams / total n-grams).
    Returns 0 if no n-grams possible.
    """
    ngrams = extract_ngrams(tokens, n)
    if not ngrams:
        return 0.0
    return len(set(ngrams)) / len(ngrams)

def ngram_entropy(tokens: List[str], n: int) -> float:
    """
    Calculate Shannon entropy of n-gram distribution.
    Returns 0 if no n-grams possible.
    """
    ngrams = extract_ngrams(tokens, n)
    if not ngrams:
        return 0.0

    counts = Counter(ngrams)
    total = len(ngrams)
    entropy = 0.0

    for count in counts.values():
        prob = count / total
        entropy -= prob * math.log2(prob)

    return entropy

def calculate_burstiness(tokens: List[str]) -> float:
    """
    Calculate burstiness coefficient based on word frequency distribution.
    Burstiness = (sigma - mu) / (sigma + mu)
    where mu is mean frequency and sigma is standard deviation.
    Returns NaN if statistics are undefined.
    """
    if not tokens:
        return np.nan

    word_counts = Counter(tokens)
    frequencies = list(word_counts.values())

    if len(frequencies) < 2:
        return np.nan

    mu = np.mean(frequencies)
    sigma = np.std(frequencies, ddof=0)

    if mu + sigma == 0:
        return np.nan

    return (sigma - mu) / (sigma + mu)

def safe_ngram_stats(tokens: List[str], n: int = 2, min_ngrams: int = 100) -> Dict[str, float]:
    """Return diversity and entropy for n-grams when sample size is sufficient."""
    if len(tokens) < n:
        return {"diversity": np.nan, "entropy": np.nan}

    grams = extract_ngrams(tokens, n)
    if len(grams) < min_ngrams:
        return {"diversity": np.nan, "entropy": np.nan}

    counts = Counter(grams)
    diversity = len(counts) / len(grams)
    probs = np.array(list(counts.values()), dtype=float) / len(grams)
    entropy = float(-np.sum(probs * np.log2(probs)))
    return {"diversity": diversity, "entropy": entropy}

print("N-gram and burstiness utility functions loaded.")


N-gram and burstiness utility functions loaded.


In [5]:
# Character-level feature extraction

from collections import Counter

def character_ngram_features(text: str, n: int = 3) -> Tuple[float, float]:
    """
    Extract character n-gram diversity and entropy.
    Returns diversity ratio and entropy for character n-grams.
    """
    if len(text) < n:
        return np.nan, np.nan

    char_ngrams = [text[i:i+n] for i in range(len(text) - n + 1)]

    if not char_ngrams:
        return np.nan, np.nan

    diversity = len(set(char_ngrams)) / len(char_ngrams)

    counts = Counter(char_ngrams)
    total = len(char_ngrams)
    entropy = 0.0
    for count in counts.values():
        prob = count / total
        entropy -= prob * math.log2(prob)

    return diversity, entropy

def compression_features(text: str) -> Dict[str, float]:
    """Compute compression ratio and bits-per-character using zlib."""
    encoded = text.encode("utf-8")
    raw_len = len(encoded)
    if raw_len == 0:
        return {"compression_ratio": np.nan, "bits_per_char": np.nan}

    compressed = zlib.compress(encoded, level=6)
    ratio = len(compressed) / raw_len
    bits_per_char = 8.0 * len(compressed) / raw_len
    return {"compression_ratio": ratio, "bits_per_char": bits_per_char}

def compression_ratio(text: str) -> float:
    """Backward-compatible helper returning only the compression ratio."""
    return compression_features(text)["compression_ratio"]

def character_statistics(text: str) -> Dict[str, float]:
    """
    Extract surface-level character statistics.
    """
    if not text:
        return {
            "uppercase_ratio": np.nan,
            "digit_ratio": np.nan,
            "whitespace_ratio": np.nan,
            "unique_char_count": np.nan,
        }

    total_chars = len(text)

    return {
        "uppercase_ratio": sum(1 for c in text if c.isupper()) / total_chars,
        "digit_ratio": sum(1 for c in text if c.isdigit()) / total_chars,
        "whitespace_ratio": sum(1 for c in text if c.isspace()) / total_chars,
        "unique_char_count": float(len(set(text))),
    }

print("Character-level feature functions loaded.")


Character-level feature functions loaded.


In [6]:
from typing import Tuple, Dict
import numpy as np
from spacy.tokens import Doc, Token

def _root_chain_depth(token: Token, max_steps: int) -> int:
    """
    Length of the head chain from `token` up to the sentence/doc root.
    Robust to cycles and malformed heads by bounding steps and tracking indices.
    """
    depth = 0
    cur = token
    visited_idx = set()

    # Use token indices (doc-relative) for identity; this is stable.
    while cur.head.i != cur.i:
        if cur.i in visited_idx:
            # Cycle detected: bail out with 0 depth for this token
            return 0
        visited_idx.add(cur.i)

        depth += 1
        if depth > max_steps:
            # Malformed graph (excessive chain): cap and exit
            return max_steps

        cur = cur.head
    return depth

def sent_max_depth(sent) -> int:
    """
    Longest head-chain root->leaf depth within a sentence span.
    Traversal is over the full doc-level heads; we only *measure* per sentence.
    """
    if len(sent) == 0:
        return 0

    # A conservative upper bound: number of tokens in the *doc*
    # (not just in the sentence), to safely handle cross-sentence heads.
    max_steps = len(sent.doc) + 5

    return max((_root_chain_depth(tok, max_steps) for tok in sent), default=0)

def doc_depth_stats(doc: Doc) -> Tuple[float, float]:
    """
    Average and maximum tree depth across sentences in the original Doc.
    Falls back gracefully for empty docs.
    """
    sentences = list(doc.sents) if doc.has_annotation("SENT_START") or hasattr(doc, "sents") else [doc]
    if not sentences:
        return 0.0, 0.0

    depths = [sent_max_depth(sent) for sent in sentences if len(sent) > 0]
    if not depths:
        return 0.0, 0.0

    return float(np.mean(depths)), float(np.max(depths))

def dependency_tree_features(doc: Doc) -> Dict[str, float]:
    """
    Extract robust dependency-tree structural features from the original Doc.
    Avoids Span.as_doc(), uses doc-level indices and bounds traversal.
    """
    sentences = list(doc.sents) if doc.has_annotation("SENT_START") or hasattr(doc, "sents") else [doc]
    if not sentences:
        sentences = [doc]

    avg_depth, max_depth = doc_depth_stats(doc)

    per_sentence_distances = []
    left_deps = 0
    right_deps = 0

    for sent in sentences:
        distances = []
        for token in sent:
            # Skip roots
            if token.head.i == token.i:
                continue
            # Distance computed in doc coordinates is fine:
            d = abs(token.i - token.head.i)
            distances.append(d)
            if token.i < token.head.i:
                left_deps += 1
            else:
                right_deps += 1
        if distances:
            per_sentence_distances.append(float(np.mean(distances)))

    avg_dep_distance = float(np.mean(per_sentence_distances)) if per_sentence_distances else 0.0
    total_deps = left_deps + right_deps
    left_ratio = (left_deps / total_deps) if total_deps else 0.0
    right_ratio = (right_deps / total_deps) if total_deps else 0.0

    return {
        "avg_tree_depth": avg_depth,
        "max_tree_depth": max_depth,
        "avg_dependency_distance": avg_dep_distance,
        "left_dependency_ratio": left_ratio,
        "right_dependency_ratio": right_ratio,
    }


In [7]:
from typing import List, Dict, Iterable, Tuple, Optional
import numpy as np
from collections import Counter
import math
import re

# ---------------------------
# Token normalization helpers
# ---------------------------

def _normalize_tokens(
    tokens: Iterable,
    *,
    lower: bool = True,
    alpha_only: bool = True,
    min_len: int = 2,
    use_lemma: bool = False
) -> List[str]:
    """
    Normalize a sequence of tokens (strings or spaCy tokens).
    - lower: lowercases
    - alpha_only: keep tokens with .isalpha() (falls back to regex if string)
    - min_len: drop tokens shorter than this after normalization
    - use_lemma: if spaCy tokens are provided, use token.lemma_
    """
    out = []
    for t in tokens:
        s = t
        # spaCy Token support
        if hasattr(t, "lemma_") and use_lemma:
            s = t.lemma_
        elif hasattr(t, "text"):
            s = t.text

        if not isinstance(s, str):
            s = str(s)

        if lower:
            s = s.lower()

        if alpha_only:
            # use spaCy attribute if available, else a regex fallback
            if hasattr(t, "is_alpha"):
                if not t.is_alpha:
                    continue
            else:
                if not re.match(r"^[a-zA-Z]+$", s):
                    continue

        if len(s) < min_len:
            continue

        out.append(s)
    return out


# ---------------------------
# Hapax ratio (stable variant)
# ---------------------------

def hapax_legomena_ratio(tokens: List[str]) -> float:
    """
    Ratio of hapax tokens to TOTAL TOKENS (your original definition).
    More common in lexicography is hapax / types; we keep your denominator,
    but you may prefer hapax / unique_types for length-robustness.
    """
    if not tokens:
        return float("nan")
    cnt = Counter(tokens)
    hapax = sum(1 for c in cnt.values() if c == 1)
    return hapax / float(len(tokens))


def hapax_type_ratio(tokens: List[str]) -> float:
    """
    Optional: hapax to TYPES ratio (often more stable than hapax/token).
    """
    if not tokens:
        return float("nan")
    cnt = Counter(tokens)
    types = len(cnt)
    if types == 0:
        return float("nan")
    hapax = sum(1 for c in cnt.values() if c == 1)
    return hapax / float(types)


# ---------------------------
# Yule's K (guarded)
# ---------------------------

def yules_k(tokens: List[str], *, min_tokens: int = 100) -> float:
    """
    Yule's K = 10^4 * (sum v^2 * V_v - N) / N^2
    Guarded against short texts; returns NaN if N < min_tokens.
    """
    N = len(tokens)
    if N < min_tokens:
        return float("nan")

    cnt = Counter(tokens)
    spectrum = Counter(cnt.values())  # V_v
    # sum v^2 * V_v
    s2 = sum((v * v) * Vv for v, Vv in spectrum.items())

    # use float64
    Nf = float(N)
    K = 10000.0 * (s2 - Nf) / (Nf * Nf)
    return float(K)


# ---------------------------
# MTLD (standard, robust)
# ---------------------------

def _mtld_one_pass(tokens: List[str], threshold: float, min_segment: int) -> float:
    """
    One-direction MTLD pass (standard algorithm):
    Accumulate a segment until TTR falls below threshold; count a factor and reset.
    The final partial segment contributes a fractional factor.
    Returns the number of factors observed.
    """
    types = set()
    token_count = 0
    factor_count = 0.0

    for tok in tokens:
        token_count += 1
        types.add(tok)
        ttr = len(types) / float(token_count)

        # Only allow a factor to close if we have a minimally meaningful segment
        if (ttr < threshold) and (token_count >= min_segment):
            factor_count += 1.0
            types.clear()
            token_count = 0

    # partial segment contribution
    if token_count > 0:
        # If ttr is already above threshold, this adds <1 factor,
        # otherwise adds a smaller fraction.
        ttr = len(types) / float(token_count)
        if ttr != 1.0:  # avoid division by zero in degenerate case
            factor_count += (1.0 - ttr) / (1.0 - threshold)
        else:
            # maximally diverse partial segment: count a tiny fraction
            factor_count += 0.0

    return factor_count


def mtld(
    tokens: List[str],
    threshold: float = 0.72,
    *,
    min_tokens: int = 200,
    min_segment: int = 50
) -> float:
    """
    Measure of Textual Lexical Diversity (MTLD), forward/backward average.
    Guard-rails:
      - require at least `min_tokens` tokens, else NaN
      - clamp threshold to [0.60, 0.80]
      - enforce `min_segment` for factor completion
    """
    n = len(tokens)
    if n < min_tokens:
        return float("nan")

    threshold = max(0.60, min(0.80, float(threshold)))

    f = _mtld_one_pass(tokens, threshold, min_segment)
    b = _mtld_one_pass(list(reversed(tokens)), threshold, min_segment)

    # If both are zero (pathological), return NaN rather than n/0
    vals = [x for x in (f, b) if x > 0.0]
    if not vals:
        return float("nan")

    mean_factors = float(np.mean(vals))
    return n / mean_factors


# ---------------------------
# Aggregator with normalization
# ---------------------------

def vocabulary_sophistication_features(
    tokens: List,
    *,
    normalize: str = "lower",   # {"none","lower","lemma"}
    alpha_only: bool = True,
    min_len: int = 2,
    use_lemma_if_spacy: Optional[bool] = None
) -> Dict[str, float]:
    """
    - Accepts raw strings or spaCy Tokens.
    - Normalizes then computes robust metrics with NaN for undersized texts.
    - Adds hapax/type as a more length-stable complement (does not replace your original).
    """
    if normalize not in {"none", "lower", "lemma"}:
        raise ValueError("normalize must be one of {'none','lower','lemma'}")

    if use_lemma_if_spacy is None:
        use_lemma_if_spacy = (normalize == "lemma")

    toks = _normalize_tokens(
        tokens,
        lower=(normalize == "lower"),
        alpha_only=alpha_only,
        min_len=min_len,
        use_lemma=use_lemma_if_spacy
    )

    return {
        "hapax_legomena_ratio": hapax_legomena_ratio(toks),
        "hapax_type_ratio":     hapax_type_ratio(toks),
        "yules_k":              yules_k(toks, min_tokens=100),
        "mtld":                 mtld(toks, threshold=0.72, min_tokens=200, min_segment=50),
    }


In [8]:
# Punctuation pattern analysis

def punctuation_patterns(doc: Doc) -> Dict[str, float]:
    """
    Detailed punctuation pattern features beyond simple ratio.
    """
    all_tokens = [t for t in doc if not t.is_space]
    punct_tokens = [t for t in doc if t.is_punct]
    
    if not all_tokens:
        return {
            "comma_ratio": 0.0,
            "period_ratio": 0.0,
            "question_ratio": 0.0,
            "exclamation_ratio": 0.0,
            "semicolon_ratio": 0.0,
            "colon_ratio": 0.0,
            "quote_ratio": 0.0,
        }
    
    total = len(all_tokens)
    
    # Count specific punctuation marks
    punct_text = ''.join([t.text for t in punct_tokens])
    
    return {
        "comma_ratio": punct_text.count(',') / total,
        "period_ratio": punct_text.count('.') / total,
        "question_ratio": punct_text.count('?') / total,
        "exclamation_ratio": punct_text.count('!') / total,
        "semicolon_ratio": punct_text.count(';') / total,
        "colon_ratio": punct_text.count(':') / total,
        "quote_ratio": (punct_text.count('"') + punct_text.count("'")) / total,
    }

print("Punctuation pattern functions loaded.")

Punctuation pattern functions loaded.


In [9]:
from textblob import TextBlob

def sentiment_features(text: str, doc: Doc) -> Dict[str, float]:
    """
    Extract sentiment and emotional tone features.
    Uses TextBlob for polarity and subjectivity.
    """
    if not text or not text.strip():
        return {
            "sentiment_polarity": 0.0,
            "sentiment_subjectivity": 0.0,
            "sentiment_polarity_variance": 0.0,
            "positive_word_ratio": 0.0,
            "negative_word_ratio": 0.0,
            "neutral_sentence_ratio": 0.0,
        }
    
    # Overall document sentiment
    blob = TextBlob(text)
    features = {
        "sentiment_polarity": blob.sentiment.polarity,  # -1 (negative) to 1 (positive)
        "sentiment_subjectivity": blob.sentiment.subjectivity,  # 0 (objective) to 1 (subjective)
    }
    
    # Sentence-level sentiment variance
    sents = list(doc.sents) if doc.has_annotation("SENT_START") else [doc]
    sent_polarities = []
    neutral_count = 0
    
    for sent in sents:
        sent_blob = TextBlob(sent.text)
        polarity = sent_blob.sentiment.polarity
        sent_polarities.append(polarity)
        
        # Count neutral sentences (polarity close to 0)
        if abs(polarity) < 0.1:
            neutral_count += 1
    
    features["sentiment_polarity_variance"] = float(np.var(sent_polarities)) if len(sent_polarities) > 1 else 0.0
    features["neutral_sentence_ratio"] = neutral_count / len(sents) if sents else 0.0
    
    # Positive/negative word ratios using spaCy tokens
    word_toks = [t for t in doc if _word_like(t)]
    if word_toks:
        positive_count = 0
        negative_count = 0
        
        for token in word_toks:
            word_blob = TextBlob(token.text.lower())
            polarity = word_blob.sentiment.polarity
            
            if polarity > 0.1:
                positive_count += 1
            elif polarity < -0.1:
                negative_count += 1
        
        features["positive_word_ratio"] = positive_count / len(word_toks)
        features["negative_word_ratio"] = negative_count / len(word_toks)
    else:
        features["positive_word_ratio"] = 0.0
        features["negative_word_ratio"] = 0.0
    
    return features

In [10]:
# Application doc processing with segmentation safeguards

def window_tokens(doc: Doc, max_tokens: int = 500) -> List[str]:
    """Lowercase alphabetic tokens clipped to a comparison window."""
    return [t.text.lower() for t in doc if _word_like(t)][:max_tokens]

feature_rows: List[Dict[str, float]] = []
for idx, row in df.reset_index(drop=True).iterrows():
    text = str(row.get(TEXT_COL, ""))
    
    # Use resegmented doc for readability and most features
    doc, resegmented = prepare_doc(text, nlp)
    doc_features = dict(features_from_doc(doc, text, resegmented=resegmented))

    tok_win = window_tokens(doc, max_tokens=500)
    unigram = safe_ngram_stats(tok_win, n=1, min_ngrams=100)
    bigram = safe_ngram_stats(tok_win, n=2, min_ngrams=100)
    trigram = safe_ngram_stats(tok_win, n=3, min_ngrams=100)
    doc_features["unigram_diversity"] = unigram["diversity"]
    doc_features["bigram_diversity"] = bigram["diversity"]
    doc_features["trigram_diversity"] = trigram["diversity"]
    doc_features["bigram_entropy"] = bigram["entropy"]
    doc_features["trigram_entropy"] = trigram["entropy"]
    doc_features["token_burstiness"] = calculate_burstiness(tok_win) if len(tok_win) >= 2 else np.nan

    char_diversity, char_entropy = character_ngram_features(text, n=3)
    doc_features["char_trigram_diversity"] = char_diversity
    doc_features["char_trigram_entropy"] = char_entropy
    doc_features.update(character_statistics(text))
    doc_features.update(compression_features(text))

    # CRITICAL: Use clean parse for dependency features to avoid resegmentation artifacts
    doc_clean, _ = prepare_doc(text, nlp, skip_reseg=True)
    dep_feats = dependency_tree_features(doc_clean)
    doc_features.update(dep_feats)
    max_tree_depth = dep_feats.get("max_tree_depth")
    try:
        depth_nan = math.isnan(max_tree_depth)
    except (TypeError, ValueError):
        depth_nan = False
    depth_ok = depth_nan or max_tree_depth is None
    if not depth_ok:
        try:
            depth_ok = max_tree_depth <= 50
        except TypeError:
            depth_ok = False
    doc_features["depth_check_passed"] = bool(depth_ok)

    vocab_feats = vocabulary_sophistication_features(tok_win)
    doc_features.update(vocab_feats)

    doc_features.update(punctuation_patterns(doc))
    doc_features.update(sentiment_features(text, doc))

    for flag in ["had_urls", "had_html", "had_code", "had_table"]:
        if flag in row.index:
            doc_features[flag] = row[flag]

    feature_rows.append(doc_features)

feat_df = pd.DataFrame(feature_rows)
df_with_features = pd.concat([df.reset_index(drop=True), feat_df.reset_index(drop=True)], axis=1)

print("Computed feature columns:")
print(list(feat_df.columns))
df_with_features.head(3)

Computed feature columns:
['avg_word_length', 'type_token_ratio', 'stopword_ratio', 'punctuation_ratio', 'avg_sentence_length', 'sentence_length_std', 'n_tokens_doc', 'n_sentences_doc', 'resegmented', 'flesch_reading_ease', 'gunning_fog', 'smog_index', 'automated_readability_index', 'unigram_diversity', 'bigram_diversity', 'trigram_diversity', 'bigram_entropy', 'trigram_entropy', 'token_burstiness', 'char_trigram_diversity', 'char_trigram_entropy', 'uppercase_ratio', 'digit_ratio', 'whitespace_ratio', 'unique_char_count', 'compression_ratio', 'bits_per_char', 'avg_tree_depth', 'max_tree_depth', 'avg_dependency_distance', 'left_dependency_ratio', 'right_dependency_ratio', 'depth_check_passed', 'hapax_legomena_ratio', 'hapax_type_ratio', 'yules_k', 'mtld', 'comma_ratio', 'period_ratio', 'question_ratio', 'exclamation_ratio', 'semicolon_ratio', 'colon_ratio', 'quote_ratio', 'sentiment_polarity', 'sentiment_subjectivity', 'sentiment_polarity_variance', 'neutral_sentence_ratio', 'positive_w

Unnamed: 0,id,adv_source_id,source_id,model,decoding,repetition_penalty,attack,domain,title,prompt,...,sentiment_polarity,sentiment_subjectivity,sentiment_polarity_variance,neutral_sentence_ratio,positive_word_ratio,negative_word_ratio,had_urls,had_html,had_code,had_table
0,8a274811-4809-4a88-96a2-9af63c06bae9,e4f35f18-1a33-4cb3-bf63-d4402ded0dbc,79c87b70-fb77-4a80-881a-2a77db2b17bc,mistral,greedy,no,zero_width_space,books,Each Man's Son,The following is the full text of a plot summa...,...,0.101742,0.260947,0.008746,0.538462,0.035714,0.012987,False,False,False,False
1,3273ddc0-2211-4116-90bb-1ce99bf01e6c,2d7d0873-d4f7-47d1-b971-cbdaff88d5bc,2d7d0873-d4f7-47d1-b971-cbdaff88d5bc,human,,,article_deletion,recipes,Salad of Roasted Beets and Arugula with Blue C...,,...,0.048611,0.367063,0.013839,0.545455,0.042735,0.038462,False,False,False,False
2,46c83315-fb96-4c22-91ac-b77e7edb42bc,4f1cf565-8c27-4c20-ac22-824146821c03,143a1df5-a01a-476d-a3fb-c59f2ec78fff,gpt3,greedy,no,whitespace,recipes,Guiltfree Chocolate Cheesecake,The following is the full text of a recipe for...,...,0.132857,0.355714,0.024519,0.727273,0.070423,0.028169,False,False,False,False


In [11]:
#  Save enriched dataset and basic descriptive statistics

OUT_WITH_FEATS = f"raid_sample_{SELECTED_DATASET}_with_features_PREPOS.csv"
df_with_features.to_csv(OUT_WITH_FEATS, index=False)
print(f"Saved enriched dataset: {OUT_WITH_FEATS}  (rows: {len(df_with_features)})")

feature_cols = [col for col in df_with_features.columns if col not in df.columns]
print(f"Feature columns saved ({len(feature_cols)} total):")
print(feature_cols)

display(df_with_features[feature_cols].describe(percentiles=[0.1, 0.5, 0.9]).T)


Saved enriched dataset: raid_sample_large_with_features_PREPOS.csv  (rows: 60000)
Feature columns saved (48 total):
['type_token_ratio', 'stopword_ratio', 'punctuation_ratio', 'avg_sentence_length', 'sentence_length_std', 'n_tokens_doc', 'n_sentences_doc', 'resegmented', 'flesch_reading_ease', 'gunning_fog', 'smog_index', 'automated_readability_index', 'unigram_diversity', 'bigram_diversity', 'trigram_diversity', 'bigram_entropy', 'trigram_entropy', 'token_burstiness', 'char_trigram_diversity', 'char_trigram_entropy', 'uppercase_ratio', 'whitespace_ratio', 'unique_char_count', 'compression_ratio', 'bits_per_char', 'avg_tree_depth', 'max_tree_depth', 'avg_dependency_distance', 'left_dependency_ratio', 'right_dependency_ratio', 'depth_check_passed', 'hapax_legomena_ratio', 'hapax_type_ratio', 'yules_k', 'mtld', 'comma_ratio', 'period_ratio', 'question_ratio', 'exclamation_ratio', 'semicolon_ratio', 'colon_ratio', 'quote_ratio', 'sentiment_polarity', 'sentiment_subjectivity', 'sentiment_p

Unnamed: 0,count,mean,std,min,10%,50%,90%,max
type_token_ratio,59999.0,0.58722,0.143939,0.002092,0.449405,0.580556,0.75,1.0
stopword_ratio,59999.0,0.442409,0.165985,0.0,0.231809,0.47432,0.608303,0.989848
punctuation_ratio,60000.0,0.029878,0.012764,0.0,0.016949,0.028169,0.045172,0.313095
avg_sentence_length,60000.0,22.519281,20.385584,0.0,12.0,19.596875,30.0,465.0
sentence_length_std,59337.0,10.254223,10.978179,0.0,4.192881,7.925103,16.10242,233.0
n_tokens_doc,60000.0,254.7591,242.829985,0.0,111.0,222.0,397.0,12615.0
n_sentences_doc,60000.0,13.6661,12.448411,1.0,5.0,11.0,24.0,805.0
flesch_reading_ease,55123.0,54.750313,24.305058,-167.910708,21.496652,58.315472,82.195726,194.240294
gunning_fog,55123.0,13.681865,5.659952,1.152766,7.406159,13.087584,20.405993,69.365657
smog_index,54886.0,12.16522,3.967437,3.66651,7.44753,11.855464,17.122413,49.068485


In [12]:
df = df_with_features.copy()

In [13]:
for col in ["avg_sentence_length","flesch_reading_ease","gunning_fog",
            "automated_readability_index","mtld","yules_k","max_tree_depth"]:
    if col in df.columns:
        print(col, "min:", df[col].min(), "max:", df[col].max())


avg_sentence_length min: 0.0 max: 465.0
flesch_reading_ease min: -167.9107078507078 max: 194.24029411764707
gunning_fog min: 1.1527659574468085 max: 69.36565656565656
automated_readability_index min: -9.797680491551457 max: 82.17217701641684
mtld min: 32.30769230769231 max: 47528.32000000002
yules_k min: 0.0 max: 9979.07949790795
max_tree_depth min: 1.0 max: 470.0


In [14]:
import numpy as np
import pandas as pd

# -------------------------------
# Helpers
# -------------------------------

NUMERIC_COLS = [
    "avg_sentence_length","sentence_length_std","flesch_reading_ease",
    "gunning_fog","automated_readability_index","mtld","yules_k",
    "bigram_entropy","trigram_entropy","bigram_diversity","trigram_diversity",
    "avg_tree_depth","max_tree_depth","avg_dependency_distance",
    "compression_ratio","uppercase_ratio","unique_char_count","whitespace_ratio"
]

def ensure_numeric(df: pd.DataFrame, cols=NUMERIC_COLS) -> pd.DataFrame:
    """Coerce known feature columns to numeric (if present)."""
    df = df.copy()
    for c in cols:
        if c in df.columns:
            df[c] = pd.to_numeric(df[c], errors="coerce")
    return df

def _q(df, col, q):
    return float(np.nanquantile(df[col].values, q)) if col in df else np.nan

# -------------------------------
# Thresholds and Diagnostics
# -------------------------------
def compute_diagnostic_thresholds(df: pd.DataFrame) -> dict:
    """Data-driven thresholds using quantiles only."""
    thr = {}
    thr["asl_hi"]       = _q(df, "avg_sentence_length", 0.99)
    thr["sls_hi"]       = _q(df, "sentence_length_std", 0.99)
    thr["fog_hi"]       = _q(df, "gunning_fog", 0.995)
    thr["ari_hi"]       = _q(df, "automated_readability_index", 0.995)
    thr["fre_lo"]       = _q(df, "flesch_reading_ease", 0.005)
    thr["mtld_hi"]      = _q(df, "mtld", 0.995)
    thr["yk_hi"]        = _q(df, "yules_k", 0.995)
    thr["depth_max_hi"] = _q(df, "max_tree_depth", 0.995)
    thr["depth_avg_hi"] = _q(df, "avg_tree_depth", 0.995)
    thr["depdist_hi"]   = _q(df, "avg_dependency_distance", 0.995)
    thr["comp_hi"]      = 1.0  # Compression > 1.0 = expansion
    thr["upper_hi"]     = _q(df, "uppercase_ratio", 0.995)
    thr["uniq_hi"]      = _q(df, "unique_char_count", 0.995)
    thr["ws_lo"]        = _q(df, "whitespace_ratio", 0.005)
    thr["ws_hi"]        = _q(df, "whitespace_ratio", 0.995)
    return thr

def diagnose_feature_outliers(df: pd.DataFrame) -> pd.DataFrame:
    """Return diagnostics dataframe with boolean flags and suspected causes list."""
    # Ensure numeric so comparisons fire correctly
    df = ensure_numeric(df)

    thr = compute_diagnostic_thresholds(df)
    D = pd.DataFrame(index=df.index)

    # Flag definitions
    D["seg_len_extreme"] = (
        (df.get("avg_sentence_length", np.nan) > thr["asl_hi"]) |
        (df.get("sentence_length_std", np.nan) > thr["sls_hi"])
    )
    D["readability_outlier"] = (
        (df.get("flesch_reading_ease", np.nan) < thr["fre_lo"]) |
        (df.get("gunning_fog", np.nan) > thr["fog_hi"]) |
        (df.get("automated_readability_index", np.nan) > thr["ari_hi"])
    )
    D["lexical_instability"] = (
        (df.get("mtld", np.nan) > thr["mtld_hi"]) |
        (df.get("yules_k", np.nan) > thr["yk_hi"])
    )
    D["ngram_edge_effects"] = (
    (df.get("bigram_entropy", np.nan) == 0) |
    (df.get("trigram_entropy", np.nan) == 0) |
    ((df.get("n_tokens_ws", 1000) < 50) & (df.get("trigram_diversity", 0) == 1.0)) |
    ((df.get("n_tokens_ws", 1000) < 50) & (df.get("bigram_diversity", 0) == 1.0))
    )
    D["depth_implausible"] = (
        (df.get("max_tree_depth", np.nan) > thr["depth_max_hi"]) |
        (df.get("avg_tree_depth", np.nan) > thr["depth_avg_hi"])
    )
    D["dep_distance_implausible"] = (df.get("avg_dependency_distance", np.nan) > thr["depdist_hi"])
    D["compression_overhead"] = (df.get("compression_ratio", np.nan) > thr["comp_hi"])
    D["markup_noise"] = (
    (df.get("uppercase_ratio", np.nan) > thr["upper_hi"]) |
    (df.get("unique_char_count", np.nan) > thr["uniq_hi"]) |
    (df.get("whitespace_ratio", np.nan) < thr["ws_lo"]) |
    (df.get("whitespace_ratio", np.nan) > thr["ws_hi"])
    )

    # Suspected causes column (pre-allocate + .at)
    D["suspected_causes"] = pd.Series([[] for _ in range(len(D))], index=D.index, dtype=object)
    for i in D.index:
        c = []
        if bool(D.at[i, "depth_implausible"]) or bool(D.at[i, "dep_distance_implausible"]):
            c.append("dependency_depth_computation_bug")
        if bool(D.at[i, "seg_len_extreme"]) and bool(D.at[i, "readability_outlier"]):
            c.append("sentence_segmentation_failure")
        if bool(D.at[i, "lexical_instability"]) or bool(D.at[i, "ngram_edge_effects"]):
            c.append("lexical_metric_instability_or_length_effects")
        if bool(D.at[i, "compression_overhead"]) or bool(D.at[i, "markup_noise"]):
            c.append("markup_or_code_noise")
        D.at[i, "suspected_causes"] = c

    # Severity score
    flag_cols = [c for c in D.columns if c != "suspected_causes"]
    for c in flag_cols:
        D[c] = D[c].fillna(False).astype(bool)
    D["severity"] = D[flag_cols].sum(axis=1).astype(int)

    # Attach thresholds for later inspection
    D.attrs["thresholds"] = thr
    return D

def build_diagnostic_report(diag: pd.DataFrame, top_k: int = 15):
    """Summarize counts by cause and return top offending rows by severity."""
    cause_counts = (
        diag["suspected_causes"].explode().value_counts(dropna=True)
        .rename_axis("cause").to_frame("count")
    )
    flag_counts = (
        diag.drop(columns=["suspected_causes"])
            .sum()
            .sort_values(ascending=False)
            .to_frame("count")
    )
    top_offenders = diag.sort_values("severity", ascending=False).head(top_k)
    return {"cause_counts": cause_counts, "flag_counts": flag_counts, "top_offenders": top_offenders}

# -------------------------------
# Runner / Example usage
# -------------------------------

# Assume you already have `df` with your features
# If your features were just computed and may be strings, the coercion inside
# diagnose_feature_outliers will handle them; coercing here is optional:
# df = ensure_numeric(df)

diag = diagnose_feature_outliers(df)
rep  = build_diagnostic_report(diag, top_k=20)

print("Computed thresholds:")
for k, v in diag.attrs.get("thresholds", {}).items():
    print(f"  {k}: {v}")

from IPython.display import display

print("\nCause counts:")
display(rep["cause_counts"])

print("\nFlag counts:")
display(rep["flag_counts"])

print("\nTop offenders (with original feature values):")
cols_to_show = [
    "avg_sentence_length","sentence_length_std","flesch_reading_ease",
    "gunning_fog","automated_readability_index","mtld","yules_k",
    "avg_tree_depth","max_tree_depth","avg_dependency_distance",
    "compression_ratio","uppercase_ratio","unique_char_count",
    "whitespace_ratio","bigram_entropy","trigram_entropy",
    "bigram_diversity","trigram_diversity"
]
existing_cols = [c for c in cols_to_show if c in df.columns]
display(rep["top_offenders"].join(df[existing_cols], how="left"))





Computed thresholds:
  asl_hi: 94.66666666666667
  sls_hi: 61.5
  fog_hi: 37.22068402366864
  ari_hi: 41.545073474497
  fre_lo: -29.083826048329737
  mtld_hi: 4733.439999999991
  yk_hi: 1754.3724034324473
  depth_max_hi: 27.0
  depth_avg_hi: 17.0
  depdist_hi: 5.721523089248039
  comp_hi: 1.0
  upper_hi: 0.0942173674928059
  uniq_hi: 73.0
  ws_lo: 0.1233140655105973
  ws_hi: 0.21633103545652826

Cause counts:


Unnamed: 0_level_0,count
cause,Unnamed: 1_level_1
markup_or_code_noise,1141
dependency_depth_computation_bug,654
lexical_metric_instability_or_length_effects,405
sentence_segmentation_failure,229



Flag counts:


Unnamed: 0,count
severity,3614
markup_noise,1115
seg_len_extreme,1022
lexical_instability,405
depth_implausible,372
readability_outlier,357
dep_distance_implausible,300
compression_overhead,41
ngram_edge_effects,2



Top offenders (with original feature values):


Unnamed: 0,seg_len_extreme,readability_outlier,lexical_instability,ngram_edge_effects,depth_implausible,dep_distance_implausible,compression_overhead,markup_noise,suspected_causes,severity,...,max_tree_depth,avg_dependency_distance,compression_ratio,uppercase_ratio,unique_char_count,whitespace_ratio,bigram_entropy,trigram_entropy,bigram_diversity,trigram_diversity
3183,True,True,True,False,False,True,False,True,"[dependency_depth_computation_bug, sentence_se...",5,...,8.0,8.322482,0.065202,0.046671,17.0,0.258751,2.520852,2.807304,0.015915,0.018617
15858,True,False,True,False,True,True,False,True,"[dependency_depth_computation_bug, lexical_met...",5,...,40.0,6.865823,0.51735,0.003471,41.0,0.122121,8.599913,8.59619,1.0,1.0
36435,True,True,True,False,True,False,False,True,"[dependency_depth_computation_bug, sentence_se...",5,...,470.0,1.118143,0.01709,0.2005,11.0,0.200083,0.086196,0.086351,0.010417,0.010438
38321,True,False,True,False,True,True,False,True,"[dependency_depth_computation_bug, lexical_met...",5,...,19.0,10.571429,0.094817,0.160556,15.0,0.027813,0.263122,0.264832,0.039062,0.03937
45103,True,False,True,True,True,False,False,True,"[dependency_depth_computation_bug, lexical_met...",5,...,122.0,1.0,0.028494,0.166893,6.0,0.165536,-0.0,-0.0,0.008197,0.008264
46486,True,True,False,False,True,True,False,True,"[dependency_depth_computation_bug, sentence_se...",5,...,43.0,5.724209,0.419024,0.003368,40.0,0.112421,8.066089,8.060696,1.0,1.0
49582,True,True,False,False,True,True,False,True,"[dependency_depth_computation_bug, sentence_se...",5,...,92.0,5.825203,0.483241,0.002121,35.0,0.109037,8.022368,8.016808,1.0,1.0
51083,True,True,True,False,False,True,False,True,"[dependency_depth_computation_bug, sentence_se...",5,...,7.0,20.534674,0.162883,0.114018,42.0,0.163376,2.716205,2.727452,0.15444,0.158915
7115,True,False,True,False,True,False,False,True,"[dependency_depth_computation_bug, lexical_met...",4,...,30.0,2.697481,0.18515,0.119831,47.0,0.172932,4.417189,5.048976,0.332226,0.396667
13451,True,False,True,False,True,False,False,True,"[dependency_depth_computation_bug, lexical_met...",4,...,42.0,3.398577,0.03956,0.118681,18.0,0.176557,2.613044,2.613073,0.029046,0.029167


In [15]:
import numpy as np
import pandas as pd

FEATURES_TO_CAP = [
    'avg_sentence_length','sentence_length_std','flesch_reading_ease',
    'gunning_fog','automated_readability_index','mtld','yules_k',
    'avg_dependency_distance','max_tree_depth','compression_ratio',
    'bigram_entropy','trigram_entropy','bigram_diversity','trigram_diversity'
]

# Natural/practical bounds to prevent absurd caps
BOUNDED = {
    'bigram_diversity': (0.0, 1.0),
    'trigram_diversity': (0.0, 1.0),
    'compression_ratio': (0.0, np.inf),
    'flesch_reading_ease': (-100.0, 150.0),   # practical working range
}

def _finite(s: pd.Series) -> pd.Series:
    return s[np.isfinite(s.values)]

def calculate_percentile_caps(df: pd.DataFrame, lower_pct=1, upper_pct=99) -> dict:
    caps = {}
    for feat in FEATURES_TO_CAP:
        if feat not in df.columns: 
            continue
        s = _finite(df[feat].dropna())
        if s.empty:
            continue
        lo = float(np.percentile(s, lower_pct))
        hi = float(np.percentile(s, upper_pct))
        if feat in BOUNDED:
            blo, bhi = BOUNDED[feat]
            lo = max(lo, blo)
            hi = min(hi, bhi)
        if lo > hi:  # degenerate case
            lo, hi = hi, lo
        caps[feat] = (lo, hi)
        print(f"{feat}: [{lo:.2f}, {hi:.2f}]")
    return caps

def cap_extreme_features(df: pd.DataFrame, caps: dict) -> pd.DataFrame:
    df = df.copy()
    df['features_capped'] = 0  # keep your counter if you like

    for feat, (lo, hi) in caps.items():
        if feat not in df.columns:
            continue
        col = df[feat]
        before = col.copy()
        # clip in-place while preserving NaNs
        df[feat] = col.clip(lower=lo, upper=hi)
        changed = (before != df[feat]) & df[feat].notna() & before.notna()
        if changed.any():
            print(f"Capped {feat}: {int(changed.sum())} values")
            df['features_capped'] += changed.astype(int)
    return df


In [16]:
actual_caps = calculate_percentile_caps(df_with_features, lower_pct=1, upper_pct=99)
df_with_features = cap_extreme_features(df_with_features, actual_caps)

avg_sentence_length: [7.81, 94.67]
sentence_length_std: [1.49, 61.50]
flesch_reading_ease: [-11.71, 96.66]
gunning_fog: [4.67, 32.09]
automated_readability_index: [2.06, 35.25]
mtld: [42.88, 2619.86]
yules_k: [2.70, 1112.71]
avg_dependency_distance: [2.14, 4.82]
max_tree_depth: [5.00, 20.00]
compression_ratio: [0.11, 0.70]
bigram_entropy: [4.26, 8.84]
trigram_entropy: [4.52, 8.95]
bigram_diversity: [0.13, 1.00]
trigram_diversity: [0.14, 1.00]
Capped avg_sentence_length: 1193 values
Capped sentence_length_std: 1185 values
Capped flesch_reading_ease: 1104 values
Capped gunning_fog: 1104 values
Capped automated_readability_index: 1104 values
Capped mtld: 605 values
Capped yules_k: 1016 values
Capped avg_dependency_distance: 1200 values
Capped max_tree_depth: 1110 values
Capped compression_ratio: 1200 values
Capped bigram_entropy: 1116 values
Capped trigram_entropy: 910 values
Capped bigram_diversity: 558 values
Capped trigram_diversity: 557 values


In [17]:
from scipy import stats
import numpy as np
import pandas as pd

READABILITY_FEATS = ['flesch_reading_ease','gunning_fog','automated_readability_index']
LEXICAL_FEATS     = ['mtld','yules_k','type_token_ratio']
PARSE_FEATS       = ['avg_dependency_distance']  # plus tree-depth presence checks

# Features where z-score should be computed on log1p to tame skew (values stay untouched)
LOG_FOR_Z = {'mtld','yules_k','avg_dependency_distance','sentence_length_std','max_tree_depth'}

def _z_on(series: pd.Series, log_if_needed: bool) -> pd.Series:
    s = series.astype(float).replace([np.inf,-np.inf], np.nan)
    if log_if_needed:
        nonan = s.dropna()
        if (nonan >= 0).all():
            s = np.log1p(s)
    # classical population z-score (ddof=0); fill NaNs with mean to avoid bias in stats.zscore
    m = s.mean()
    sd = s.std(ddof=0)
    if sd == 0 or np.isnan(sd):
        return pd.Series(0.0, index=s.index)
    return (s - m) / sd

def add_quality_flags_statistical(df: pd.DataFrame, n_std: float = 3.0) -> pd.DataFrame:
    df = df.copy()

    # Compute z-scores (abs) transiently; do not keep columns
    z_map = {}

    for feat in set(READABILITY_FEATS + LEXICAL_FEATS + PARSE_FEATS):
        if feat in df.columns:
            z = _z_on(df[feat], log_if_needed=(feat in LOG_FOR_Z)).abs()
            z_map[feat] = z

    df['parse_quality_issue'] = (
        df['max_tree_depth'].isna() |
        df['avg_tree_depth'].isna() |
        (z_map.get('avg_dependency_distance', pd.Series(0, index=df.index)) > n_std)
    )

    df['readability_anomaly'] = (
        (z_map.get('flesch_reading_ease', pd.Series(0, index=df.index)) > n_std) |
        (z_map.get('gunning_fog', pd.Series(0, index=df.index)) > n_std) |
        (z_map.get('automated_readability_index', pd.Series(0, index=df.index)) > n_std)
    )

    df['lexical_anomaly'] = (
        (z_map.get('mtld', pd.Series(0, index=df.index)) > n_std) |
        (z_map.get('yules_k', pd.Series(0, index=df.index)) > n_std) |
        (z_map.get('type_token_ratio', pd.Series(0, index=df.index)) > n_std)
    )

    df['quality_score'] = (3 - (
        df['parse_quality_issue'].astype(int) +
        df['readability_anomaly'].astype(int) +
        df['lexical_anomaly'].astype(int)
    )).clip(lower=0, upper=3)

    print(f"Using {n_std} standard deviations as threshold")
    print(f"Parse issues: {int(df['parse_quality_issue'].sum())} texts")
    print(f"Readability anomalies: {int(df['readability_anomaly'].sum())} texts")
    print(f"Lexical anomalies: {int(df['lexical_anomaly'].sum())} texts")
    print(f"Quality score distribution:\n{df['quality_score'].value_counts().sort_index()}")

    return df

df_with_features = add_quality_flags_statistical(df_with_features, n_std=3.0)

Using 3.0 standard deviations as threshold
Parse issues: 922 texts
Readability anomalies: 1068 texts
Lexical anomalies: 2525 texts
Quality score distribution:
quality_score
0      175
1      776
2     2438
3    56611
Name: count, dtype: int64


In [18]:
import numpy as np
import pandas as pd

def impute_after_capping(df: pd.DataFrame, max_missing_pct: float = 0.4) -> pd.DataFrame:
    """
    Impute missing values with group medians, dropping features with excessive missingness.
    
    Parameters:
    -----------
    df : pd.DataFrame
        Input dataframe
    max_missing_pct : float
        Maximum proportion of missing values allowed (default 0.4 = 40%)
        Features exceeding this threshold are dropped before imputation
    """
    df = df.copy()
    df.replace([np.inf, -np.inf], np.nan, inplace=True)

    # Drop duplicate columns (keep first)
    if df.columns.duplicated().any():
        dup_mask = df.columns.duplicated()
        print(f"[Impute] Dropping duplicate columns: {df.columns[dup_mask].tolist()}")
        df = df.loc[:, ~dup_mask]

    # Numeric features except target
    num_feats = [c for c in df.select_dtypes(include=[np.number]).columns if c != "is_ai"]
    if not num_feats:
        return df

    # Check missingness and drop high-missing features FIRST
    missing_pct = df[num_feats].isna().mean()
    high_missing = missing_pct[missing_pct > max_missing_pct].sort_values(ascending=False)
    
    if not high_missing.empty:
        print(f"\n[Impute] Dropping {len(high_missing)} features with >{max_missing_pct*100:.0f}% missing:")
        for feat, pct in high_missing.items():
            print(f"  - {feat}: {pct*100:.1f}% missing")
        
        # Remove high-missing features
        num_feats = [f for f in num_feats if f not in high_missing.index]
        df.drop(columns=high_missing.index, inplace=True)
    
    if not num_feats:
        print("[Impute] No features remaining after missingness filter")
        return df

    # Group keys for stratified imputation
    group_keys = []
    if "source_type" in df.columns:
        group_keys.append("source_type")
    if "n_tokens_doc" in df.columns:
        bins = [0, 100, 250, 500, 10_000]
        labels = ["S", "M", "L", "XL"]
        df["__len_bin__"] = pd.cut(df["n_tokens_doc"], bins=bins, right=False, labels=labels)
        group_keys.append("__len_bin__")

    before_missing = df[num_feats].isna().sum()
    values = df[num_feats].to_numpy(dtype=float)
    mask = np.isnan(values)

    # Group-wise median imputation
    if group_keys:
        med_df = df.groupby(group_keys, dropna=False, observed=False)[num_feats].transform("median")
        med_vals = med_df.to_numpy(dtype=float)
        values = np.where(mask, med_vals, values)

    # Global per-column median fallback
    still_nan = np.isnan(values)
    if still_nan.any():
        # Check for columns that are entirely NaN after group fill
        all_nan_cols = np.where(np.isnan(values).all(axis=0))[0]
        if len(all_nan_cols):
            drop_cols = [num_feats[i] for i in all_nan_cols]
            print(f"[Impute] Dropping {len(drop_cols)} all-NaN features: {drop_cols}")
            
            keep_idx = [i for i in range(values.shape[1]) if i not in all_nan_cols]
            values = values[:, keep_idx]
            num_feats = [num_feats[i] for i in keep_idx]
            df.drop(columns=drop_cols, inplace=True)
            still_nan = np.isnan(values)

        if still_nan.any():
            col_medians = np.nanmedian(values, axis=0)
            row_idx, col_idx = np.where(still_nan)
            values[row_idx, col_idx] = col_medians[col_idx]

    # Write back
    df.loc[:, num_feats] = values
    df.drop(columns=["__len_bin__"], errors="ignore", inplace=True)

    # Report imputation summary
    after_missing = df[num_feats].isna().sum()
    imputed_counts = (before_missing.reindex(num_feats).fillna(0).astype(int)
                      - after_missing.reindex(num_feats).fillna(0).astype(int))
    total_imputed = int(imputed_counts.sum())
    
    if total_imputed:
        print(f"\n[Impute] Filled {total_imputed} missing values across {(imputed_counts > 0).sum()} features")
        top_imputed = imputed_counts[imputed_counts > 0].sort_values(ascending=False).head(10)
        print("Top 10 imputed features:")
        print(top_imputed.to_string())

    return df

# Usage
df_with_features = impute_after_capping(df_with_features, max_missing_pct=0.4)

[Impute] Dropping duplicate columns: ['avg_word_length', 'digit_ratio', 'had_urls', 'had_html', 'had_code', 'had_table']

[Impute] Dropping 2 features with >40% missing:
  - not_text_reason: 100.0% missing
  - mtld: 49.6% missing

[Impute] Filled 53783 missing values across 16 features
Top 10 imputed features:
yules_k                        9255
smog_index                     5114
gunning_fog                    4877
flesch_reading_ease            4877
automated_readability_index    4877
trigram_diversity              4362
trigram_entropy                4362
bigram_diversity               4259
bigram_entropy                 4259
unigram_diversity              4122


In [19]:
def drop_correlated_features(
    df: pd.DataFrame,
    feature_cols: List[str] = None,
    threshold: float = 0.85,
    method: str = 'pearson',
    keep_strategy: str = 'lower_missing'
) -> Tuple[pd.DataFrame, List[str], pd.DataFrame]:
    """
    Identify and drop highly correlated features.
    
    Parameters:
    -----------
    df : pd.DataFrame
        Input dataframe
    feature_cols : List[str], optional
        List of feature columns to check. If None, uses all numeric columns except 'is_ai'
    threshold : float
        Correlation threshold (default 0.85). Pairs above this are considered redundant
    method : str
        Correlation method: 'pearson', 'spearman', or 'kendall'
    keep_strategy : str
        Which feature to keep from correlated pairs:
        - 'lower_missing': Keep feature with less missing data
        - 'higher_variance': Keep feature with higher variance
        - 'first': Keep the first feature alphabetically
    
    Returns:
    --------
    df_filtered : pd.DataFrame
        Dataframe with redundant features removed
    dropped_features : List[str]
        List of dropped feature names
    corr_pairs : pd.DataFrame
        DataFrame showing correlated pairs and correlation values
    """
    df = df.copy()
    
    # Identify feature columns
    if feature_cols is None:
        feature_cols = [c for c in df.select_dtypes(include=[np.number]).columns 
                       if c != 'is_ai']
    
    # Compute correlation matrix
    corr_matrix = df[feature_cols].corr(method=method).abs()
    
    # Get upper triangle (avoid double-counting pairs)
    upper_tri = corr_matrix.where(
        np.triu(np.ones(corr_matrix.shape), k=1).astype(bool)
    )
    
    # Find correlated pairs
    correlated_pairs = []
    for col in upper_tri.columns:
        high_corr = upper_tri[col][upper_tri[col] > threshold]
        for idx, corr_val in high_corr.items():
            correlated_pairs.append({
                'feature_1': col,
                'feature_2': idx,
                'correlation': corr_val
            })
    
    if not correlated_pairs:
        print(f"[Correlation] No feature pairs with correlation > {threshold}")
        return df, [], pd.DataFrame()
    
    # Create dataframe of correlated pairs
    corr_df = pd.DataFrame(correlated_pairs).sort_values('correlation', ascending=False)
    
    print(f"\n[Correlation] Found {len(corr_df)} highly correlated pairs (>{threshold}):")
    print(corr_df.to_string(index=False))
    
    # Determine which features to drop
    to_drop: Set[str] = set()
    
    for _, row in corr_df.iterrows():
        feat1, feat2 = row['feature_1'], row['feature_2']
        
        # Skip if either already marked for dropping
        if feat1 in to_drop or feat2 in to_drop:
            continue
        
        # Decide which to keep based on strategy
        if keep_strategy == 'lower_missing':
            miss1 = df[feat1].isna().mean()
            miss2 = df[feat2].isna().mean()
            drop_feat = feat1 if miss1 > miss2 else feat2
            keep_feat = feat2 if miss1 > miss2 else feat1
            reason = f"missing: {miss1:.1%} vs {miss2:.1%}"
            
        elif keep_strategy == 'higher_variance':
            var1 = df[feat1].var()
            var2 = df[feat2].var()
            drop_feat = feat1 if var1 < var2 else feat2
            keep_feat = feat2 if var1 < var2 else feat1
            reason = f"variance: {var1:.2f} vs {var2:.2f}"
            
        else:  # 'first'
            drop_feat = max(feat1, feat2)  # Drop lexicographically later
            keep_feat = min(feat1, feat2)
            reason = "alphabetical"
        
        to_drop.add(drop_feat)
        print(f"  Drop '{drop_feat}', keep '{keep_feat}' ({reason})")
    
    # Drop features
    dropped_list = sorted(to_drop)
    df_filtered = df.drop(columns=dropped_list)
    
    print(f"\n[Correlation] Dropped {len(dropped_list)} redundant features")
    print(f"  Remaining features: {len([c for c in df_filtered.columns if c in feature_cols])}")
    
    return df_filtered, dropped_list, corr_df


# Usage example
df_with_features, dropped_features, correlation_pairs = drop_correlated_features(
    df_with_features,
    threshold=0.85,
    keep_strategy='lower_missing'
)

# Optional: Visualize correlation matrix before/after
def plot_correlation_heatmap(df, feature_cols=None, title="Feature Correlations"):
    """Plot correlation heatmap for visual inspection"""
    import matplotlib.pyplot as plt
    import seaborn as sns
    
    if feature_cols is None:
        feature_cols = [c for c in df.select_dtypes(include=[np.number]).columns 
                       if c != 'is_ai']
    
    corr = df[feature_cols].corr()
    
    plt.figure(figsize=(14, 12))
    mask = np.triu(np.ones_like(corr, dtype=bool), k=1)
    sns.heatmap(corr, mask=mask, annot=False, cmap='coolwarm', 
                center=0, vmin=-1, vmax=1, square=True,
                linewidths=0.5, cbar_kws={"shrink": 0.8})
    plt.title(title, fontsize=14, pad=20)
    plt.tight_layout()
    plt.show()

# Visualize before/after (optional)
# plot_correlation_heatmap(df_original, title="Before Dropping Correlated Features")
# plot_correlation_heatmap(df_with_features, title="After Dropping Correlated Features")


[Correlation] Found 23 highly correlated pairs (>0.85):
                  feature_1              feature_2  correlation
     right_dependency_ratio  left_dependency_ratio     1.000000
               n_tokens_doc                  n_tok     0.999695
                n_tokens_ws                  n_tok     0.998853
               n_tokens_doc            n_tokens_ws     0.998847
                      n_tok                n_chars     0.993493
                n_tokens_ws                n_chars     0.993412
               n_tokens_doc                n_chars     0.993291
            trigram_entropy         bigram_entropy     0.990213
              bits_per_char      compression_ratio     0.985311
          trigram_diversity       bigram_diversity     0.967851
                 smog_index    flesch_reading_ease     0.939617
                gunning_fog    flesch_reading_ease     0.933114
                 smog_index            gunning_fog     0.931207
           whitespace_ratio        avg_word_len

In [20]:
# >>>> Tailored to my code

def drop_correlated_features_strategic(df: pd.DataFrame, threshold: float = 0.85) -> Tuple[pd.DataFrame, dict]:
    """
    Drop correlated features with domain-informed decisions about which to keep.
    """
    df = df.copy()
    
    # Manual decisions based on your correlation output
    drops = {
        # Perfect/near-perfect duplicates - keep the most interpretable
        'right_dependency_ratio': 'Perfect inverse of left_dependency_ratio',
        'n_tok': 'Duplicate of n_tokens_ws (0.998 correlation)',
        'n_chars': 'Redundant with n_tokens_ws (0.987 correlation)',
        
        # N-gram entropy/diversity - keep diversity (more interpretable)
        'bigram_entropy': 'Redundant with bigram_diversity (0.991 correlation)',
        'trigram_entropy': 'Redundant with trigram_diversity (0.969 correlation)',
        
        # Compression metrics - keep ratio (more standard)
        'bits_per_char': 'Redundant with compression_ratio (0.990 correlation)',
        
        # Readability indices - keep Flesch (most widely used)
        'smog_index': 'Redundant with flesch_reading_ease (0.937 correlation)',
        'gunning_fog': 'Redundant with flesch_reading_ease (0.934 correlation)',
        'automated_readability_index': 'Redundant with gunning_fog (0.903 correlation)',
        
        # Lexical diversity - keep type_token_ratio (simpler)
        'unigram_diversity': 'Redundant with type_token_ratio (0.908 correlation)',
        'hapax_type_ratio': 'Redundant with hapax_legomena_ratio (0.881 correlation)',
        
        # Character/compression overlap - keep char_trigram_diversity
        # (compression_ratio already kept, so this reduces triple redundancy)
    }
    
    # Check which features actually exist
    existing_drops = {k: v for k, v in drops.items() if k in df.columns}
    
    print(f"[Strategic Drop] Removing {len(existing_drops)} redundant features:\n")
    for feat, reason in existing_drops.items():
        print(f"  ✗ {feat}")
        print(f"    → {reason}\n")
    
    df_filtered = df.drop(columns=list(existing_drops.keys()))
    
    # Verify no high correlations remain
    feature_cols = [c for c in df_filtered.select_dtypes(include=[np.number]).columns 
                   if c != 'is_ai']
    corr_matrix = df_filtered[feature_cols].corr().abs()
    upper_tri = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(bool))
    
    remaining_high = []
    for col in upper_tri.columns:
        high_corr = upper_tri[col][upper_tri[col] > threshold]
        if not high_corr.empty:
            for idx, val in high_corr.items():
                remaining_high.append((col, idx, val))
    
    if remaining_high:
        print(f"[Warning] {len(remaining_high)} pairs still exceed threshold:")
        for f1, f2, corr in sorted(remaining_high, key=lambda x: x[2], reverse=True)[:5]:
            print(f"  {f1} <-> {f2}: {corr:.3f}")
    else:
        print(f"[Success] No correlations > {threshold} remain")
    
    print(f"\n[Final] {len(feature_cols)} features retained")
    
    return df_filtered, existing_drops

# Apply strategic dropping
df_with_features, drop_log = drop_correlated_features_strategic(df_with_features, threshold=0.85)

[Strategic Drop] Removing 7 redundant features:

  ✗ right_dependency_ratio
    → Perfect inverse of left_dependency_ratio

  ✗ trigram_entropy
    → Redundant with trigram_diversity (0.969 correlation)

  ✗ bits_per_char
    → Redundant with compression_ratio (0.990 correlation)

  ✗ smog_index
    → Redundant with flesch_reading_ease (0.937 correlation)

  ✗ automated_readability_index
    → Redundant with gunning_fog (0.903 correlation)

  ✗ unigram_diversity
    → Redundant with type_token_ratio (0.908 correlation)

  ✗ hapax_type_ratio
    → Redundant with hapax_legomena_ratio (0.881 correlation)

[Success] No correlations > 0.85 remain

[Final] 36 features retained


In [21]:
OUT_WITH_FEATS = f"raid_sample_{SELECTED_DATASET}_with_features_CLEANED.csv"

# Ensure balanced quality across train/test when you split later
print(f"\nQuality distribution by source_type:")
print(df_with_features.groupby(['model', 'quality_score']).size().unstack(fill_value=0))

df_with_features.to_csv(OUT_WITH_FEATS, index=False)
print(f"Saved enriched dataset: {OUT_WITH_FEATS} (rows: {len(df_with_features)})")


Quality distribution by source_type:
quality_score   0    1    2      3
model                             
chatgpt         0    2    7   2719
cohere          0   10   36   2682
cohere-chat     0    2   26   2700
gpt2           10   80  452   2186
gpt3            0    7   44   2677
gpt4            0    0    3   2725
human           3   63  396  29538
llama-chat      0    1   71   2656
mistral         1   54  294   2378
mistral-chat    0    6   21   2700
mpt            92  339  877   1417
mpt-chat       69  212  211   2233
Saved enriched dataset: raid_sample_large_with_features_CLEANED.csv (rows: 60000)
