In [1]:
import pandas as pd
df = pd.read_csv("ngrams.csv")

### Punctuation and Stopwords

In [5]:
import re
import pandas as pd
import nltk
nltk.download("stopwords", quiet=True)
from nltk.corpus import stopwords
EN_STOP = set(stopwords.words("english"))

def extract_stopwords(df: pd.DataFrame,
                      text_cols=("y_train", "y_pred", "y_test"),
                      stopword_set=EN_STOP,
                      regex=r"\b\w+\b"):
    """
    Add new *_stopwords columns that contain the stop-words found
    in each corresponding text column.

    Parameters
    ----------
    df : pandas.DataFrame
        The frame that already contains the text columns.
    text_cols : tuple[str]
        Column names you want processed.
    stopword_set : set[str]
        The words to treat as stop-words (default = English NLTK list).
    regex : str
        Tokenisation pattern (default = words made of letters/digits).

    Returns
    -------
    pandas.DataFrame
        The same frame, with extra columns like 'y_train_stopwords'.
    """
    tokeniser = re.compile(regex).findall

    def row_stops(txt):
        return [t for t in tokeniser(str(txt).lower()) if t in stopword_set]

    for col in text_cols:
        df[f"{col}_stopwords"] = df[col].apply(row_stops)

    return df

In [7]:
df = extract_stopwords(df) 

In [11]:
import re
from collections import Counter
from typing import Dict, Tuple, Iterable, List

import numpy as np
import pandas as pd
import nltk
nltk.download("stopwords", quiet=True)
nltk.download("punkt", quiet=True)
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize

# ----------------------------------------------------------------------
# 1.  shared resources
# ----------------------------------------------------------------------
ENGLISH_STOPWORDS: List[str] = stopwords.words("english")
EN_STOP = set(ENGLISH_STOPWORDS)          # fast membership test
TOKEN_PATTERN = re.compile(r"\b\w+\b").findall


# ----------------------------------------------------------------------
# 2.  low-level helpers  (almost exactly your original code)
# ----------------------------------------------------------------------
def extract_stopwords(text: str | None) -> List[str]:
    """Return the stop-words that actually appear in *text* (lower-cased)."""
    if not text:
        return []
    tokens = TOKEN_PATTERN(str(text).lower())
    return [tok for tok in tokens if tok in EN_STOP]


def get_stopword_distribution(text: str | None) -> Dict[str, float]:
    """Normalized frequency of each stop-word + a 'density' feature."""
    stopword_tokens = extract_stopwords(text)
    counts = Counter(stopword_tokens)
    total = sum(counts.values()) or 1                         # avoid /0
    # vector with *all* standard stop-words (missing ones → 0)
    dist = {w: counts.get(w, 0) / total for w in ENGLISH_STOPWORDS}

    # density = stop-words per word
    word_cnt = len(word_tokenize(str(text or "").lower()))
    dist["density"] = len(stopword_tokens) / word_cnt if word_cnt else 0
    return dist


def stopwords_similarity(text1: str | None,
                         text2: str | None) -> Tuple[float, Dict]:
    """Cosine similarity between the two distributions + rich diagnostics."""
    d1, d2 = get_stopword_distribution(text1), get_stopword_distribution(text2)

    features = ENGLISH_STOPWORDS + ["density"]
    v1 = np.array([d1[f] for f in features])
    v2 = np.array([d2[f] for f in features])

    sim = (np.dot(v1, v2) /
           (np.linalg.norm(v1) * np.linalg.norm(v2))) if v1.any() and v2.any() else 0.0

    # optional detail payload (trim to first 20 stop-words for readability)
    most1, most2 = Counter(extract_stopwords(text1)).most_common(10), \
                   Counter(extract_stopwords(text2)).most_common(10)
    top_overlap = len({w for w, _ in most1}.intersection({w for w, _ in most2}))

    details = {
        "text1_stats": {"most_common": most1, "density": d1["density"]},
        "text2_stats": {"most_common": most2, "density": d2["density"]},
        "top10_overlap": top_overlap,
        "similarity": sim
    }
    return sim, details


# ----------------------------------------------------------------------
# 3.  DataFrame-level helpers
# ----------------------------------------------------------------------
def add_stopword_columns(df: pd.DataFrame,
                         text_cols: Iterable[str] = ("y_train", "y_pred", "y_test")
                         ) -> pd.DataFrame:
    """Add a <col>_stopwords list column for every *text_cols* entry."""
    for col in text_cols:
        df[f"{col}_stopwords"] = df[col].apply(extract_stopwords)
    return df


def add_stopword_similarity(df: pd.DataFrame,
                            pairs: Iterable[Tuple[str, str]] = (
                                ("y_train", "y_pred"),
                                ("y_pred", "y_test"),
                                ("y_train", "y_test")
                            ),
                            keep_details: bool = False
                            ) -> pd.DataFrame:
    """
    For each (colA, colB) pair, append a cosine-similarity score column.

    • Column is named  '<colA>_vs_<colB>_stop_sim'.  
    • If *keep_details* is True, a second column with the same stem
      plus '_details' is added containing the verbose diagnostics dict.
    """
    for a, b in pairs:
        sim_col = f"{a}_vs_{b}_stop_sim"
        if keep_details:
            det_col = f"{a}_vs_{b}_stop_sim_details"

            def _pair(row):
                sim, det = stopwords_similarity(row[a], row[b])
                return pd.Series({sim_col: sim, det_col: det})

            df[[sim_col, det_col]] = df.apply(_pair, axis=1)
        else:
            df[sim_col] = df.apply(lambda r: stopwords_similarity(r[a], r[b])[0],
                                   axis=1)
    return df


# ----------------------------------------------------------------------
# 4.  Example usage

df = add_stopword_columns(df)
df = add_stopword_similarity(df, keep_details=False)

In [15]:
import re
import string
from collections import Counter
from typing import Dict, Tuple, Iterable, List

import numpy as np
import pandas as pd

# ----------------------------------------------------------------------
# 1.  shared resources
# ----------------------------------------------------------------------
PUNCTUATION_CHARS: List[str] = list(string.punctuation)        # 32 ASCII marks
PUNCT_SET = set(PUNCTUATION_CHARS)
PUNCT_PATTERN = re.compile(f"[{re.escape(string.punctuation)}]")  # matches ONE mark


# ----------------------------------------------------------------------
# 2.  low-level helpers
# ----------------------------------------------------------------------
def extract_punctuation(text: str | None) -> List[str]:
    """Return *every* punctuation mark that appears in *text* (one per hit)."""
    if not text:
        return []
    return PUNCT_PATTERN.findall(str(text))


def get_punctuation_distribution(text: str | None) -> Dict[str, float]:
    """
    Normalised frequency of each ASCII punctuation mark (+ 'density').

    • Density = punctuation marks per *character* (not word) so it stays
      meaningful even for very short snippets such as “Hi!”.
    """
    tokens = extract_punctuation(text)
    counts = Counter(tokens)
    total = sum(counts.values()) or 1

    dist = {ch: counts.get(ch, 0) / total for ch in PUNCTUATION_CHARS}

    char_count = len(str(text or ""))
    dist["density"] = len(tokens) / char_count if char_count else 0
    return dist


def punctuation_similarity(text1: str | None,
                           text2: str | None) -> Tuple[float, Dict]:
    """Cosine-similarity of punctuation distributions + handy diagnostics."""
    d1, d2 = (get_punctuation_distribution(text1),
              get_punctuation_distribution(text2))

    feats = PUNCTUATION_CHARS + ["density"]
    v1 = np.array([d1[f] for f in feats])
    v2 = np.array([d2[f] for f in feats])

    sim = (np.dot(v1, v2) /
           (np.linalg.norm(v1) * np.linalg.norm(v2))) if v1.any() and v2.any() else 0.0

    most1, most2 = Counter(extract_punctuation(text1)).most_common(5), \
                   Counter(extract_punctuation(text2)).most_common(5)
    overlap = len({ch for ch, _ in most1}.intersection({ch for ch, _ in most2}))

    details = {
        "text1_stats": {"most_common": most1, "density": d1["density"]},
        "text2_stats": {"most_common": most2, "density": d2["density"]},
        "top5_overlap": overlap,
        "similarity": sim
    }
    return sim, details


# ----------------------------------------------------------------------
# 3.  DataFrame-level helpers
# ----------------------------------------------------------------------
def add_punctuation_columns(df: pd.DataFrame,
                            text_cols: Iterable[str] = ("y_train",
                                                        "y_pred",
                                                        "y_test")
                            ) -> pd.DataFrame:
    """Add a <col>_punct list column for every *text_cols* entry."""
    for col in text_cols:
        df[f"{col}_punct"] = df[col].apply(extract_punctuation)
    return df


def add_punctuation_similarity(df: pd.DataFrame,
                               pairs: Iterable[Tuple[str, str]] = (
                                   ("y_train", "y_pred"),
                                   ("y_pred", "y_test"),
                                   ("y_train", "y_test")
                               ),
                               keep_details: bool = False
                               ) -> pd.DataFrame:
    """
    Append cosine-similarity columns for each (colA, colB) pair based on punctuation.

    • Column:  '<colA>_vs_<colB>_punct_sim'  
    • If *keep_details* is True, also add '<pair>_punct_sim_details' with diagnostics.
    """
    for a, b in pairs:
        sim_col = f"{a}_vs_{b}_punct_sim"

        if keep_details:
            det_col = f"{a}_vs_{b}_punct_sim_details"

            def _pair(row):
                sim, det = punctuation_similarity(row[a], row[b])
                return pd.Series({sim_col: sim, det_col: det})

            df[[sim_col, det_col]] = df.apply(_pair, axis=1)
        else:
            df[sim_col] = df.apply(lambda r: punctuation_similarity(r[a], r[b])[0],
                                   axis=1)
    return df


# ----------------------------------------------------------------------
# 4.  Example quick-test
# ----------------------------------------------------------------------


df = add_punctuation_columns(df)
df = add_punctuation_similarity(df, keep_details=False)

### TTR Similarity

In [26]:
def preprocess_text(text: str, remove_stopwords: bool = False) -> List[str]:
    """
    Preprocess text by tokenizing, lowercasing, and optionally removing stopwords.
    
    Args:
        text (str): The input text to preprocess
        remove_stopwords (bool): Whether to remove stopwords
        
    Returns:
        List[str]: Preprocessed tokens
    """
    # Tokenize the text
    tokens = word_tokenize(text.lower())
    
    # Remove non-alphabetic tokens
    tokens = [token for token in tokens if token.isalpha()]
    
    # Optionally remove stopwords
    if remove_stopwords:
        stop_words = set(stopwords.words('english'))
        tokens = [token for token in tokens if token not in stop_words]
    
    return tokens

def calculate_ttr(tokens: List[str]) -> float:
    """
    Calculate the Type-Token Ratio (TTR).
    
    Args:
        tokens (List[str]): List of tokens from the text
        
    Returns:
        float: The Type-Token Ratio value
    """
    if not tokens:
        return 0
    
    n_types = len(set(tokens))  # Number of unique words
    n_tokens = len(tokens)      # Total number of words
    
    return n_types / n_tokens

def moving_average_ttr(tokens: List[str], window_size: int = 100) -> float:
    """
    Calculate Moving-Average Type-Token Ratio (MATTR).
    
    Args:
        tokens (List[str]): List of tokens from the text
        window_size (int): Size of the sliding window
        
    Returns:
        float: The MATTR value
    """
    if len(tokens) < window_size:
        return calculate_ttr(tokens)
    
    # Calculate TTR for each window and take the average
    ttrs = []
    for i in range(len(tokens) - window_size + 1):
        window = tokens[i:i+window_size]
        ttrs.append(calculate_ttr(window))
    
    return sum(ttrs) / len(ttrs)

def mtld(tokens: List[str], threshold: float = 0.72) -> float:
    """
    Calculate Measure of Textual Lexical Diversity (MTLD).
    
    Args:
        tokens (List[str]): List of tokens from the text
        threshold (float): The TTR threshold for factor count
        
    Returns:
        float: The MTLD value
    """
    if len(tokens) < 50:  # Too short for reliable MTLD
        return 0
    
    def mtld_pass(tokens, threshold):
        # Forward pass
        factors = 0
        types_so_far = set()
        token_count = 0
        
        for token in tokens:
            token_count += 1
            types_so_far.add(token)
            ttr = len(types_so_far) / token_count
            
            if ttr <= threshold:
                factors += 1
                types_so_far = set()
                token_count = 0
        
        if token_count > 0:
            ttr = len(types_so_far) / token_count
            partial_factor = (1 - ttr) / (1 - threshold)
            factors += partial_factor
        
        return len(tokens) / factors if factors > 0 else 0
    
    # Calculate MTLD as the average of forward and backward passes
    forward = mtld_pass(tokens, threshold)
    backward = mtld_pass(tokens[::-1], threshold)
    
    return (forward + backward) / 2

def ttr_similarity(text1: str, text2: str, include_stopwords: bool = True) -> Tuple[float, Dict]:
    """
    Compare two texts based on their lexical diversity (TTR) metrics.
    
    Args:
        text1 (str): First text to compare
        text2 (str): Second text to compare
        include_stopwords (bool): Whether to include stopwords in the analysis
        
    Returns:
        Tuple[float, Dict]: A tuple containing:
            - A similarity score between 0-1
            - A dictionary with detailed metrics
    """
    # Preprocess texts
    tokens1 = preprocess_text(text1, remove_stopwords=not include_stopwords)
    tokens2 = preprocess_text(text2, remove_stopwords=not include_stopwords)
    
    # Calculate basic TTR for both texts
    ttr1 = calculate_ttr(tokens1)
    ttr2 = calculate_ttr(tokens2)
    
    # Calculate MATTR for both texts
    mattr1 = moving_average_ttr(tokens1)
    mattr2 = moving_average_ttr(tokens2)
    
    # Calculate MTLD for both texts
    mtld1 = mtld(tokens1)
    mtld2 = mtld(tokens2)
    
    # Calculate similarity scores (1 - normalized absolute difference)
    ttr_sim = 1 - abs(ttr1 - ttr2) / max(ttr1, ttr2) if max(ttr1, ttr2) > 0 else 1
    mattr_sim = 1 - abs(mattr1 - mattr2) / max(mattr1, mattr2) if max(mattr1, mattr2) > 0 else 1
    mtld_sim = 1 - abs(mtld1 - mtld2) / max(mtld1, mtld2) if max(mtld1, mtld2) > 0 else 1
    
    # Calculate overall similarity (weighted average)
    overall_sim = (ttr_sim * 0.3) + (mattr_sim * 0.4) + (mtld_sim * 0.3)
    
    # Prepare detailed output
    details = {
        "ttr": {
            "text1": ttr1, 
            "text2": ttr2, 
            "similarity": ttr_sim
        },
        "mattr": {
            "text1": mattr1, 
            "text2": mattr2, 
            "similarity": mattr_sim
        },
        "mtld": {
            "text1": mtld1, 
            "text2": mtld2, 
            "similarity": mtld_sim
        },
        "text1_stats": {
            "tokens": len(tokens1), 
            "unique_tokens": len(set(tokens1)),
            "lexical_density": len(set(tokens1)) / len(tokens1) if tokens1 else 0
        },
        "text2_stats": {
            "tokens": len(tokens2), 
            "unique_tokens": len(set(tokens2)),
            "lexical_density": len(set(tokens2)) / len(tokens2) if tokens2 else 0
        }
    }
    
    return overall_sim, details

# ----------------------------------------------------------------------
# 0.  once-per-session NLTK housekeeping
# ----------------------------------------------------------------------
import nltk
nltk.download("punkt", quiet=True)
nltk.download("stopwords", quiet=True)
from nltk.tokenize import word_tokenize           # used inside preprocess_text
from nltk.corpus import stopwords                 #    "      "      "

# ----------------------------------------------------------------------
# 1.  DataFrame-level helper
# ----------------------------------------------------------------------
from typing import Iterable, Tuple
import pandas as pd

def add_ttr_similarity(df: pd.DataFrame,
                       pairs: Iterable[Tuple[str, str]] = (
                           ("y_train", "y_pred"),
                           ("y_pred",  "y_test"),
                           ("y_train", "y_test")
                       ),
                       include_stopwords: bool = True,
                       keep_details: bool = False
                       ) -> pd.DataFrame:
    """
    Append a lexical-diversity similarity column for each (colA, colB) pair.

    Parameters
    ----------
    df : pandas.DataFrame
        Your frame containing the text columns.
    pairs : iterable[tuple[str, str]]
        Column name pairs to compare (default = the three pair-wise combos
        of 'y_train', 'y_pred', 'y_test').
    include_stopwords : bool
        Passed straight through to `ttr_similarity()`.
    keep_details : bool
        • False  → add only '<colA>_vs_<colB>_ttr_sim' (float 0–1).  
        • True   → also add '<pair>_ttr_sim_details' with the full metrics dict.

    Returns
    -------
    pandas.DataFrame
        The same frame, with the new similarity (and optional details) columns.
    """
    for a, b in pairs:
        sim_col = f"{a}_vs_{b}_ttr_sim"

        if keep_details:
            det_col = f"{a}_vs_{b}_ttr_sim_details"

            def _compare(row):
                sim, det = ttr_similarity(row[a],
                                          row[b],
                                          include_stopwords=include_stopwords)
                return pd.Series({sim_col: sim, det_col: det})

            df[[sim_col, det_col]] = df.apply(_compare, axis=1)

        else:
            df[sim_col] = df.apply(
                lambda r: ttr_similarity(r[a],
                                         r[b],
                                         include_stopwords=include_stopwords)[0],
                axis=1
            )
    return df


In [28]:
df = add_ttr_similarity(df, keep_details=False)

### Sentence Transformers

In [41]:
# from sentence_transformers import SentenceTransformer
# import numpy as np
# from typing import Tuple

# def load_model():
#     """Load the Sentence-BERT model"""
#     return SentenceTransformer('all-MiniLM-L6-v2')

# def transformer_similarity(text1: str, text2: str, model=None) -> Tuple[float, dict]:
#     """
#     Compare two texts using Sentence Transformers.
#     Returns a similarity score and the sentence embeddings.
#     """
#     if model is None:
#         model = load_model()
    
#     # Get embeddings
#     embedding1 = model.encode([text1])[0]
#     embedding2 = model.encode([text2])[0]
    
#     # Calculate cosine similarity
#     similarity = np.dot(embedding1, embedding2) / (np.linalg.norm(embedding1) * np.linalg.norm(embedding2))
    
#     return similarity, {
#         "embedding1_sample": embedding1[:5].tolist(),  # Show first 5 dimensions
#         "embedding2_sample": embedding2[:5].tolist()
#     }



# # ------------------------------------------------------------------
# # 0.  one-time model load  (do this once per Python session)
# # ------------------------------------------------------------------
# from sentence_transformers import SentenceTransformer
# import numpy as np
# import pandas as pd
# from typing import Iterable, Tuple

# _SBERT_MODEL = SentenceTransformer("all-MiniLM-L6-v2")   # ~80 MB, tiny + fast

# # ------------------------------------------------------------------
# # 1.  low-level single-pair similarity function
# # ------------------------------------------------------------------
# def transformer_similarity(text1: str | None,
#                            text2: str | None,
#                            model: SentenceTransformer = _SBERT_MODEL
#                            ) -> Tuple[float, dict]:
#     """
#     Cosine similarity between the two sentence-level embeddings.
#     Returns (similarity, small-diagnostics-dict).
#     """
#     text1 = text1 or ""
#     text2 = text2 or ""
#     emb1, emb2 = model.encode([text1, text2], normalize_embeddings=True)

#     # cosine on **already L2-normalised** vectors = simple dot product
#     similarity = float(np.dot(emb1, emb2))

#     details = {
#         "embedding1_sample": emb1[:5].tolist(),   # first 5 dims for sanity-check
#         "embedding2_sample": emb2[:5].tolist()
#     }
#     return similarity, details


# # ------------------------------------------------------------------
# # 2.  DataFrame-level helper
# # ------------------------------------------------------------------
# def add_transformer_similarity(df: pd.DataFrame,
#                                pairs: Iterable[Tuple[str, str]] = (
#                                    ("y_train", "y_pred"),
#                                    ("y_pred",  "y_test"),
#                                    ("y_train", "y_test")
#                                ),
#                                keep_details: bool = False,
#                                model: SentenceTransformer = _SBERT_MODEL
#                                ) -> pd.DataFrame:
#     """
#     Append semantic similarity columns computed with a Sentence-BERT model.

#     • For each (colA, colB) you get '<colA>_vs_<colB>_sbert_sim'  (float, –1‥1).  
#       (Because embeddings are normalised, scores are usually 0‥1 for
#        “reasonably related” English texts; negatives mean strong dissimilarity.)

#     • With *keep_details=True* a companion
#       '<pair>_sbert_sim_details' column is added containing the two
#       1 024-D vectors’ first five dimensions.

#     • Pass your own *model* if you want a different checkpoint
#       (e.g. multilingual or domain-specific).
#     """
#     for a, b in pairs:
#         sim_col = f"{a}_vs_{b}_sbert_sim"

#         if keep_details:
#             det_col = f"{a}_vs_{b}_sbert_sim_details"

#             def _compare(row):
#                 sim, det = transformer_similarity(row[a], row[b], model)
#                 return pd.Series({sim_col: sim, det_col: det})

#             df[[sim_col, det_col]] = df.apply(_compare, axis=1)

#         else:
#             df[sim_col] = df.apply(
#                 lambda r: transformer_similarity(r[a], r[b], model)[0],
#                 axis=1
#             )
#     return df

In [43]:
# df = add_transformer_similarity(df, keep_details=False)

### POS Similarity

In [48]:
nltk.download('averaged_perceptron_tagger')
nltk.download('punkt')
nltk.download('stopwords')

def get_pos_distribution(text: str) -> dict:
    """
    Get the distribution of POS tags in the text.
    Returns a dictionary with POS tags as keys and their frequencies as values.
    """
    # Tokenize and get POS tags
    tokens = word_tokenize(text.lower())
    pos_tags = pos_tag(tokens)
    
    # Count POS tag frequencies
    pos_dist = {}
    for _, tag in pos_tags:
        pos_dist[tag] = pos_dist.get(tag, 0) + 1
    
    # Normalize frequencies
    total = sum(pos_dist.values())
    for tag in pos_dist:
        pos_dist[tag] = pos_dist[tag] / total
        
    return pos_dist

def pos_similarity(text1: str, text2: str) -> Tuple[float, dict]:
    """
    Compare two texts based on their POS tag distributions.
    Returns a similarity score and the POS distributions.
    """
    # Get POS distributions
    dist1 = get_pos_distribution(text1)
    dist2 = get_pos_distribution(text2)
    
    # Get all unique POS tags
    all_tags = set(dist1.keys()) | set(dist2.keys())
    
    # Calculate cosine similarity
    vec1 = np.array([dist1.get(tag, 0) for tag in all_tags])
    vec2 = np.array([dist2.get(tag, 0) for tag in all_tags])
    
    similarity = np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))
    
    return similarity, {"text1_pos": dist1, "text2_pos": dist2}


# ------------------------------------------------------------------
# 0.  once-per-session NLTK housekeeping
# ------------------------------------------------------------------
import nltk, numpy as np, pandas as pd
nltk.download("averaged_perceptron_tagger", quiet=True)
nltk.download("punkt", quiet=True)

from nltk import pos_tag, word_tokenize
from typing import Dict, Tuple, Iterable


# ------------------------------------------------------------------
# 1.  low-level POS helpers
# ------------------------------------------------------------------
def get_pos_distribution(text: str | None) -> Dict[str, float]:
    """
    Return a normalised frequency table of Penn-Treebank POS tags.
    """
    tokens  = word_tokenize(str(text or "").lower())
    tags    = pos_tag(tokens)

    counts  = {}
    for _, tag in tags:
        counts[tag] = counts.get(tag, 0) + 1

    total   = sum(counts.values()) or 1
    return {tag: c / total for tag, c in counts.items()}


def pos_similarity(text1: str | None,
                   text2: str | None
                   ) -> Tuple[float, Dict]:
    """
    Cosine similarity between the two POS-tag distributions.
    """
    dist1, dist2 = get_pos_distribution(text1), get_pos_distribution(text2)
    all_tags     = set(dist1) | set(dist2)

    v1 = np.array([dist1.get(tag, 0.0) for tag in all_tags])
    v2 = np.array([dist2.get(tag, 0.0) for tag in all_tags])

    sim = (np.dot(v1, v2) /
           (np.linalg.norm(v1) * np.linalg.norm(v2))) if v1.any() and v2.any() else 0.0

    return sim, {"text1_pos": dist1, "text2_pos": dist2}


# ------------------------------------------------------------------
# 2.  DataFrame-level helper  ➜  use exactly like the others
# ------------------------------------------------------------------
def add_pos_similarity(df: pd.DataFrame,
                       pairs: Iterable[Tuple[str, str]] = (
                           ("y_train", "y_pred"),
                           ("y_pred",  "y_test"),
                           ("y_train", "y_test")
                       ),
                       keep_details: bool = False
                       ) -> pd.DataFrame:
    """
    Append POS-distribution similarity columns.

    • For each (colA, colB) → `<colA>_vs_<colB>_pos_sim` (float 0–1).  
    • If *keep_details* is True, also add
      `<pair>_pos_sim_details` with both normalised distributions.
    """
    for a, b in pairs:
        sim_col = f"{a}_vs_{b}_pos_sim"

        if keep_details:
            det_col = f"{a}_vs_{b}_pos_sim_details"

            def _row(row):
                sim, det = pos_similarity(row[a], row[b])
                return pd.Series({sim_col: sim, det_col: det})

            df[[sim_col, det_col]] = df.apply(_row, axis=1)
        else:
            df[sim_col] = df.apply(
                lambda r: pos_similarity(r[a], r[b])[0],
                axis=1
            )
    return df


[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /Users/konstantinoskatharakes/nltk_data...
[nltk_data]   Package averaged_perceptron_tagger is already up-to-
[nltk_data]       date!
[nltk_data] Downloading package punkt to
[nltk_data]     /Users/konstantinoskatharakes/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/konstantinoskatharakes/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [50]:
df = add_pos_similarity(df, keep_details=False)

### Sentence Length

In [55]:
def calculate_sentence_stats(text: str) -> Dict[str, float]:
    """
    Calculate average sentence length and other sentence statistics.
    
    Args:
        text (str): The input text to analyze
        
    Returns:
        dict: Dictionary containing sentence statistics
    """
    # Clean and prepare text
    text = text.strip()

    # Split into sentences
    sentences = re.split(r'[.!?]+', text)
    sentences = [s.strip() for s in sentences if s.strip()]
    
    # Count words in each sentence
    sentence_word_counts = [len(re.findall(r'\b\w+\b', s)) for s in sentences]
    
    # Calculate statistics
    avg_sentence_length = sum(sentence_word_counts) / len(sentences) if sentences else 0
    median_length = np.median(sentence_word_counts) if sentences else 0
    std_dev = np.std(sentence_word_counts) if sentences else 0
    
    return {
        "avg_sentence_length": avg_sentence_length,
        "median_sentence_length": median_length,
        "std_deviation": std_dev,
        "total_sentences": len(sentences),
        "sentence_lengths": sentence_word_counts
    }

def sentence_length_similarity(text1: str, text2: str) -> Tuple[float, Dict]:
    """
    Compare two texts based on their average sentence lengths.
    Returns a similarity score and the sentence statistics.
    
    Args:
        text1 (str): First text to compare
        text2 (str): Second text to compare
        
    Returns:
        Tuple containing:
        - similarity score (float): 1.0 means identical average sentence length, 
          closer to 0.0 means more different
        - dictionary with sentence statistics for both texts
    """
    # Get sentence statistics
    stats1 = calculate_sentence_stats(text1)
    stats2 = calculate_sentence_stats(text2)
    
    # Calculate similarity as the ratio of the shorter average to the longer one
    # This gives a value between 0 and 1, where 1 means identical average lengths
    avg1 = stats1["avg_sentence_length"]
    avg2 = stats2["avg_sentence_length"]
    
    if avg1 == 0 and avg2 == 0:  # Edge case: both texts have no sentences
        similarity = 1.0
    elif avg1 == 0 or avg2 == 0:  # Edge case: one text has no sentences
        similarity = 0.0
    else:
        similarity = min(avg1, avg2) / max(avg1, avg2)
    
    return similarity, {
        "text1_stats": stats1,
        "text2_stats": stats2,
        "difference": avg2 - avg1  # positive if text2 has longer sentences
    }

# ----------------------------------------------------------------------
# 0.  imports once per file
# ----------------------------------------------------------------------
import re, numpy as np, pandas as pd
from typing import Dict, Tuple, Iterable

# ----------------------------------------------------------------------
# 1.  low-level sentence-length helpers  (your originals, unchanged)
# ----------------------------------------------------------------------
def calculate_sentence_stats(text: str) -> Dict[str, float]:
    """
    Return average-, median-, std-sentence length plus raw lengths list.
    """
    text = str(text or "").strip()

    # crude sentence split (period / exclam / question)
    sentences = re.split(r"[.!?]+", text)
    sentences = [s.strip() for s in sentences if s.strip()]

    lengths = [len(re.findall(r"\b\w+\b", s)) for s in sentences]

    avg = sum(lengths) / len(sentences) if sentences else 0.0

    return {
        "avg_sentence_length": avg,
        "median_sentence_length": float(np.median(lengths)) if lengths else 0.0,
        "std_deviation": float(np.std(lengths)) if lengths else 0.0,
        "total_sentences": len(sentences),
        "sentence_lengths": lengths,
    }


def sentence_length_similarity(text1: str,
                               text2: str
                               ) -> Tuple[float, Dict]:
    """Similarity = min(avg1, avg2) / max(avg1, avg2) (range 0–1)."""
    stats1, stats2 = (calculate_sentence_stats(text1),
                      calculate_sentence_stats(text2))

    avg1, avg2 = stats1["avg_sentence_length"], stats2["avg_sentence_length"]

    if avg1 == avg2 == 0:
        sim = 1.0
    elif avg1 == 0 or avg2 == 0:
        sim = 0.0
    else:
        sim = min(avg1, avg2) / max(avg1, avg2)

    details = {
        "text1_stats": stats1,
        "text2_stats": stats2,
        "difference": avg2 - avg1,
    }
    return sim, details


# ----------------------------------------------------------------------
# 2.  DataFrame-level helper  ➜  call just like add_stopword_similarity()
# ----------------------------------------------------------------------
def add_sentence_length_similarity(df: pd.DataFrame,
                                   pairs: Iterable[Tuple[str, str]] = (
                                       ("y_train", "y_pred"),
                                       ("y_pred",  "y_test"),
                                       ("y_train", "y_test"),
                                   ),
                                   keep_details: bool = False
                                   ) -> pd.DataFrame:
    """
    Append per-row sentence-length similarity columns.

    • Each (colA, colB) pair adds
        '<colA>_vs_<colB>_sentlen_sim'            (float 0–1)

    • If *keep_details* is True a companion
        '<pair>_sentlen_sim_details'
      column contains the full stats dict.
    """
    for a, b in pairs:
        sim_col = f"{a}_vs_{b}_sentlen_sim"

        if keep_details:
            det_col = f"{a}_vs_{b}_sentlen_sim_details"

            def _row(row):
                sim, det = sentence_length_similarity(row[a], row[b])
                return pd.Series({sim_col: sim, det_col: det})

            df[[sim_col, det_col]] = df.apply(_row, axis=1)

        else:
            df[sim_col] = df.apply(
                lambda r: sentence_length_similarity(r[a], r[b])[0],
                axis=1
            )
    return df

In [57]:
df = add_sentence_length_similarity(df, keep_details=False)

### Text Frequency - Document Frequency

In [80]:
import nltk
from nltk.tokenize import word_tokenize
from sklearn.feature_extraction.text import TfidfVectorizer
import numpy as np
from typing import Tuple, Dict, List, Any, Union

# Download required NLTK data
nltk.download('punkt', quiet=True)
nltk.download('stopwords', quiet=True)

def preprocess_text(text: str) -> str:
    """
    Preprocess text by lowercasing and normalizing whitespace.
    
    Args:
        text (str): The input text to preprocess
        
    Returns:
        str: Preprocessed text
    """
    # Simple preprocessing - sklearn's TfidfVectorizer will handle tokenization
    return text.lower()

def extract_top_features(tfidf_matrix: np.ndarray, feature_names: np.ndarray, top_n: int = 10) -> Dict[str, float]:
    """
    Extract top weighted features from TF-IDF matrix for a document.
    
    Args:
        tfidf_matrix (np.ndarray): Row of the TF-IDF matrix for a document
        feature_names (np.ndarray): Array of feature names
        top_n (int): Number of top features to extract
        
    Returns:
        Dict[str, float]: Dictionary of top features with their weights
    """
    # Get indices of top weighted features
    top_indices = np.argsort(tfidf_matrix)[::-1][:top_n]
    
    # Create a dictionary of feature names and their weights
    top_features = {feature_names[i]: float(tfidf_matrix[i]) for i in top_indices if tfidf_matrix[i] > 0}
    
    return top_features

def analyze_unique_features(weights1: np.ndarray, weights2: np.ndarray, feature_names: np.ndarray, top_n: int = 5) -> Dict[str, List[str]]:
    """
    Find features that are unique to each text.
    
    Args:
        weights1 (np.ndarray): TF-IDF weights for first text
        weights2 (np.ndarray): TF-IDF weights for second text
        feature_names (np.ndarray): Array of feature names
        top_n (int): Number of top unique features to extract
        
    Returns:
        Dict[str, List[str]]: Dictionary with unique features for each text
    """
    # Features present in text1 but not in text2
    unique_to_text1 = [(i, weights1[i]) for i in range(len(weights1)) if weights1[i] > 0 and weights2[i] == 0]
    unique_to_text1.sort(key=lambda x: x[1], reverse=True)
    
    # Features present in text2 but not in text1
    unique_to_text2 = [(i, weights2[i]) for i in range(len(weights2)) if weights2[i] > 0 and weights1[i] == 0]
    unique_to_text2.sort(key=lambda x: x[1], reverse=True)
    
    # Extract feature names
    unique_features_text1 = [feature_names[idx] for idx, _ in unique_to_text1[:top_n]]
    unique_features_text2 = [feature_names[idx] for idx, _ in unique_to_text2[:top_n]]
    
    return {
        "unique_to_text1": unique_features_text1,
        "unique_to_text2": unique_features_text2
    }

def tfidf_similarity(text1: str, text2: str, ngram_range: Tuple[int, int] = (1, 3), remove_stopwords: bool = True) -> Tuple[float, Dict[str, Any]]:
    """
    Compare two texts using TF-IDF vectors across multiple n-gram ranges.
    
    Args:
        text1 (str): First text to compare
        text2 (str): Second text to compare
        ngram_range (tuple): Range of n-gram sizes to include (min, max)
        remove_stopwords (bool): Whether to remove stopwords
        
    Returns:
        Tuple[float, Dict]: A tuple containing:
            - A similarity score between 0-1
            - A dictionary with detailed metrics
    """
    # Preprocess texts
    processed_text1 = preprocess_text(text1)
    processed_text2 = preprocess_text(text2)
    
    # Process stopwords if needed
    stop_words = 'english' if remove_stopwords else None
    
    # Create vectorizer for the specified n-gram range
    vectorizer = TfidfVectorizer(
        ngram_range=ngram_range, 
        stop_words=stop_words,
        sublinear_tf=True  # Apply sublinear scaling to term frequencies
    )
    
    # Fit and transform both texts
    try:
        tfidf_matrix = vectorizer.fit_transform([processed_text1, processed_text2])
        feature_names = vectorizer.get_feature_names_out()
    except ValueError as e:
        # Handle empty corpus or other vectorization errors
        return 0.0, {"error": str(e)}
    
    # Calculate cosine similarity
    if tfidf_matrix.shape[1] > 0:  # Ensure we have features
        # Convert sparse matrix to dense for simpler operations
        dense_matrix = tfidf_matrix.toarray()
        # Calculate cosine similarity
        dot_product = np.dot(dense_matrix[0], dense_matrix[1])
        norm_text1 = np.linalg.norm(dense_matrix[0])
        norm_text2 = np.linalg.norm(dense_matrix[1])
        
        if norm_text1 > 0 and norm_text2 > 0:  # Avoid division by zero
            similarity = dot_product / (norm_text1 * norm_text2)
        else:
            similarity = 0.0
    else:
        similarity = 0.0
    
    # Extract top features for each text
    if tfidf_matrix.shape[1] > 0:
        dense_matrix = tfidf_matrix.toarray()
        top_features1 = extract_top_features(dense_matrix[0], feature_names)
        top_features2 = extract_top_features(dense_matrix[1], feature_names)
        
        # Analyze unique features
        unique_features = analyze_unique_features(dense_matrix[0], dense_matrix[1], feature_names)
        
        # Get n-gram level statistics
        ngram_stats = {}
        for n in range(ngram_range[0], ngram_range[1] + 1):
            # Filter features by n-gram length
            n_gram_features = [f for f in feature_names if len(f.split()) == n]
            if n_gram_features:
                ngram_stats[f"{n}-gram"] = len(n_gram_features)
    else:
        top_features1 = {}
        top_features2 = {}
        unique_features = {"unique_to_text1": [], "unique_to_text2": []}
        ngram_stats = {}
    
    # Prepare detailed output
    details = {
        "similarity": similarity,
        "ngram_range": ngram_range,
        "stopwords_removed": remove_stopwords,
        "top_features_text1": top_features1,
        "top_features_text2": top_features2,
        "unique_features": unique_features,
        "ngram_stats": ngram_stats,
        "vectorizer_vocabulary_size": len(feature_names) if feature_names is not None else 0
    }
    
    return similarity, details

# ----------------------------------------------------------------------
# 0.  once-per-file imports
# ----------------------------------------------------------------------
import pandas as pd
from typing import Iterable, Tuple

# keep the tf-idf functions you already wrote in the same module:
# • preprocess_text
# • tfidf_similarity
# (they need nltk + scikit-learn, already imported in your snippet)


# ----------------------------------------------------------------------
# 1.  DataFrame-level helper
# ----------------------------------------------------------------------
def add_tfidf_similarity(
    df: pd.DataFrame,
    pairs: Iterable[Tuple[str, str]] = (
        ("y_train", "y_pred"),
        ("y_pred",  "y_test"),
        ("y_train", "y_test"),
    ),
    ngram_range: Tuple[int, int] = (1, 1),
    remove_stopwords: bool = False,
    keep_details: bool = False,
) -> pd.DataFrame:
    """
    Append per-row TF-IDF cosine-similarity columns.

    Parameters
    ----------
    df : pandas.DataFrame
        Frame that already contains the text columns.
    pairs : iterable[(str, str)]
        Column pairs to compare.
    ngram_range : (min_n, max_n)
        Passed to `tfidf_similarity` (default = unigrams–trigrams).
    remove_stopwords : bool
        Whether the vectoriser should ignore English stop-words.
    keep_details : bool
        • False → only a '<pair>_tfidf_sim' column (float 0–1).  
        • True  → an additional '<pair>_tfidf_sim_details' column
          with the full diagnostics dict from `tfidf_similarity`.

    Returns
    -------
    pandas.DataFrame
        Same frame, with added similarity (and optional details) columns.
    """
    for a, b in pairs:
        sim_col = f"{a}_vs_{b}_tfidf_sim"

        if keep_details:
            det_col = f"{a}_vs_{b}_tfidf_sim_details"

            def _row(row):
                sim, det = tfidf_similarity(
                    row[a],
                    row[b],
                    ngram_range=ngram_range,
                    remove_stopwords=remove_stopwords,
                )
                return pd.Series({sim_col: sim, det_col: det})

            df[[sim_col, det_col]] = df.apply(_row, axis=1)

        else:
            df[sim_col] = df.apply(
                lambda r: tfidf_similarity(
                    r[a],
                    r[b],
                    ngram_range=ngram_range,
                    remove_stopwords=remove_stopwords,
                )[0],
                axis=1,
            )
    return df


In [82]:
df = add_tfidf_similarity(df, keep_details=False)

In [86]:
df.to_csv("Final_dataset.csv", index=None)

In [88]:
df

Unnamed: 0,Author,Title,y_train,y_pred,y_test,common_unigrams_y_pred_test,common_bigrams_y_pred_test,common_trigrams_y_pred_test,common_unigrams_y_pred_train,common_bigrams_y_pred_train,...,y_train_vs_y_test_ttr_sim,y_train_vs_y_pred_pos_sim,y_pred_vs_y_test_pos_sim,y_train_vs_y_test_pos_sim,y_train_vs_y_pred_sentlen_sim,y_pred_vs_y_test_sentlen_sim,y_train_vs_y_test_sentlen_sim,y_train_vs_y_pred_tfidf_sim,y_pred_vs_y_test_tfidf_sim,y_train_vs_y_test_tfidf_sim
0,Mark_Twain,THE MAN THAT CORRUPTED HADLEYBURG.txt,THE MAN THAT CORRUPTED HADLEYBURG\n\nAND OTHER...,He made a\ntrip to Hadleyburg and found it the...,"joy. He began to form a plan at once, saying t...",43,20,4,50,28,...,0.934055,0.962196,0.945538,0.948423,0.538851,0.826777,0.445510,0.370351,0.274771,0.287364
1,Mark_Twain,To the Person Sitting in Darkness.txt,TO THE PERSON SITTING IN DARKNESS\n\n\n ...,And\nthere is more of it. The People who Sit i...,Game. It shows that these new players of it ar...,28,16,3,31,33,...,0.927173,0.877965,0.864209,0.964112,0.879538,0.640734,0.728489,0.369874,0.243572,0.346954
2,Mark_Twain,Life on the Mississippi.txt,Produced by David Widger. Earliest PG text edi...,A New Plan.--A Little Tact.--The Mayor is\nHir...,rd. CHAPTER XXIII. Old French Settlements.--We...,33,49,43,17,13,...,0.887572,0.910531,0.894371,0.995097,0.977095,0.941602,0.963675,0.164692,0.180605,0.227837
3,Mark_Twain,A Horse's Tale.txt,A Horse’s Tale\n\n\n ...,* * * * *\n\n\nA Horse’s Tale\n\nCHAP. I.\nSOL...,rian will correct these defects.” The cats in ...,38,21,7,41,28,...,0.918855,0.903443,0.875433,0.928989,0.556345,0.917051,0.510197,0.265685,0.253564,0.330454
4,Mark_Twain,1601 Conversation as it was by the Social Fire...,1601\n\nConversation as it was by the Social F...,The first edition of it was published\nin 1880...,601. The piece is a supposititious conversatio...,53,34,6,58,42,...,0.995280,0.938622,0.943471,0.989911,0.895141,0.751918,0.840000,0.366840,0.322166,0.333211
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
393,Robin Hanson,Prestige in US Today.txt,Lauren A. Rivera’s Pedigree: How Elite Student...,"So, this is a system that is self-reinforcing,...",It seems that while these firms do sell concre...,49,19,2,51,35,...,0.939551,0.876522,0.864894,0.974111,0.717391,0.657471,0.916475,0.309852,0.279250,0.363796
394,Robin Hanson,"AI Risk, Again.txt",Large language models like ChatGPT have recent...,(The future world could be a world of many AIs...,"Of course the owners of such future ventures, ...",44,20,3,49,38,...,0.945580,0.919564,0.932777,0.982412,0.861481,0.688020,0.798648,0.332692,0.275696,0.366223
395,Robin Hanson,New Tax Career Agent Test.txt,"If that taxpayer approved, the taxes that he o...",If the worker who gets the TCA has a higher ex...,Bids should give direct estimates of worker va...,61,40,6,67,44,...,0.962536,0.962763,0.943678,0.968653,0.942137,0.972364,0.916100,0.379268,0.392012,0.411402
396,Robin Hanson,A Perfect Storm of Inflexibility.txt,Most biological species specialize for particu...,"But the problem is that, in peace time, this m...","In addition to these two considerations, longe...",47,18,2,83,61,...,0.943198,0.959473,0.952826,0.970158,0.878442,0.672515,0.765576,0.458430,0.255961,0.307586
