### Imports

In [None]:
import re
import os, sys
import platform
from pathlib import Path

project_root = Path.cwd().resolve().parent
analyze_root = project_root / "analyze"

if project_root not in sys.path:
    sys.path.append(str(project_root))
if analyze_root not in sys.path:
    sys.path.append(str(analyze_root))

import torch
torch._dynamo.disable()

import numpy as np	
from analyze.generated_text_detector.generated_text_detector.utils.text_detector import GeneratedTextDetector

from database.database import engine
import pandas as pd

from IPython.display import display, HTML
from tqdm.notebook import tqdm

from scipy.stats import ttest_ind
import datetime
import matplotlib.pyplot as plt
import seaborn as sns


### Preprocess Pipeline

In [None]:
def unmark_element(markdown_text: str) -> str:
    """
    Cleans and strips markdown content, leaving behind only the semantic text
    ready for an embedding model.

    Args:
        markdown_text: The raw markdown string.

    Returns:
        A cleaned text string.
    """

    # --- 1. Initial Cleaning and Normalization ---

    # 1.1 REMOVE LINKS AND IMAGE TAGS: Remove the pattern [text](url) and ![text](url)
    text = re.sub(r'\!?\[.*?\]\s*\(.*?\)', '', markdown_text, flags=re.DOTALL)
    text = re.sub(r'Zoom image will be displayed', '', text)
    text = re.sub(r'http[s]?://miro.medium.com/v2/resize:.*?\.png', '', text)

    # 1.2 Remove Extraneous Backslashes (e.g., escaping in \- or \.)
    text = re.sub(r'\\-', '-', text)
    text = re.sub(r'\\([`*_{}\[\]()#+.!])', r'\1', text)
    
    # 1.3 Normalize Newlines: Convert multiple newlines/whitespace into a single space
    text = re.sub(r'\s+', ' ', text).strip()
    
    # --- 2. Markdown Structure Stripping ---

    # 2.1 Remove Headings (Setext style: === or --- lines)
    text = re.sub(r'\n[=-]{2,}\s*$', '', text, flags=re.MULTILINE)

    # 2.2 Remove Blockquotes/Code Fences (Markers: > and ```)
    text = re.sub(r'^\s*>\s?', '', text, flags=re.MULTILINE)
    text = re.sub(r'```[a-zA-Z]*\s*', ' ', text)
    text = re.sub(r'`', ' ', text)
    
    # 2.3 Remove List Markers (e.g., 1. or - or *)
    text = re.sub(r'^\s*\d+\.\s', '', text, flags=re.MULTILINE)
    text = re.sub(r'^\s*[\-\*]\s', '', text, flags=re.MULTILINE)
    
    # 2.4 Remove Emphasis Markers (e.g., **, *, __, _)
    text = re.sub(r'(\*\*|__)', '', text) # Bold/Strong
    text = re.sub(r'(\*|_)', '', text)    # Italic/Emphasis

    # 2.5 Remove remaining HTML tags (like '<hibernate-mapping>') which are often in code
    text = re.sub(r'<[^>]+>', '', text)
    
    # --- 3. Final Text Polishing ---

    # 3.1 Normalize Whitespace again: Collapse all multiple spaces into one
    text = re.sub(r'\s+', ' ', text).strip()

    # 3.2 Lowercasing (Optional but recommended for many embedding models)
    text = text.lower()

    return text.replace("Press enter or click to view image in full size.", "").strip()

### Sentence Analysis Helpers

In [None]:
def sliding_window_words(text, size=256, step=1):
    """Split text into overlapping chunks of approx. `size` characters."""
    words = text.split()
    for i in range(0, len(words), step):
        chunk = words[i]
        j = 0
        while len(chunk) < size and i + j + 1 < len(words):
            j += 1
            if len(chunk) + len(words[i + j]) + 1 <= size:  # +1 for space
                chunk += " " + words[i + j]
            else:
                break
        yield chunk    


def single_sentence_chunks(text, size=256):
    """Split text into individual sentences."""
    sentences = re.split(r'(?<=[.!?]) +', text)
    for sentence in sentences:
        if len(sentence) <= size:
            yield sentence
        else:
            # cut sentence into smaller parts if too long, split by words, end when size is reached or we find a punctuation
            words = sentence.split()
            sub_chunk = ""
            for word in words:
                if len(sub_chunk) + len(word) + 1 <= size or ".!?" in word:
                    if sub_chunk:
                        sub_chunk += " "
                    sub_chunk += word
                else:
                    if sub_chunk:
                        yield sub_chunk
                    sub_chunk = word
            if sub_chunk:
                yield sub_chunk

In [None]:
def aggregate_scores(text, chunks, results, sliding_windows_approach=True):
    """Aggregate model scores per word (supports sliding or non-sliding)."""
    words = text.split()
    scores = np.zeros((len(words), 2), dtype=float)

    word_index = 0
    for i, res in enumerate(results):
        score = res['score']
        # if res['label'] == 'Real':
        #     score = 1 - score

        chunk_words = chunks[i].split()

        for j, _ in enumerate(chunk_words):
            idx = (i + j) if sliding_windows_approach else word_index
            if idx >= len(scores):
                break

            scores[idx, 0] += score
            scores[idx, 1] += 1

            if not sliding_windows_approach:
                word_index += 1

    # Normalize counts
    valid = scores[:, 1] > 0
    scores[valid, 0] /= scores[valid, 1]
    scores[~valid, 0] = np.nan
    scores = scores[:, :1]

    return scores


def visualize_scores(text, scores):
    """Display text with word-level color highlighting based on score."""
    highlighted_text = ""
    words = text.split()

    MAX_VAL = 200
    WHITE_VAL = 255

    for i, word in enumerate(words):
        score = scores[i, 0] if i < len(scores) else 0

        if score <= 0.5:
            ratio = score / 0.5  
            r = int(WHITE_VAL * ratio)
            g = int(MAX_VAL + (WHITE_VAL - MAX_VAL) * ratio)
            b = int(WHITE_VAL * ratio)
        else:
            ratio = (score - 0.5) / 0.5
            r = int(WHITE_VAL - (WHITE_VAL - MAX_VAL) * ratio)
            g = int(WHITE_VAL * (1 - ratio))
            b = int(WHITE_VAL * (1 - ratio))

        color = f"rgb({r},{g},{b})"
        highlighted_text += f'<span style="background-color: {color}">{word} </span>'

    display(HTML(f"<div style='line-height:1.6; font-size:16px'>{highlighted_text}</div>"))

### AI Detection

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"

class DetectorWrapper:
    def __init__(self, model_name="SuperAnnotate/ai-detector-low-fpr", device=device, max_len=512):
        if device is None:
            device = "cuda" if torch.cuda.is_available() else "cpu"
        self.detector = GeneratedTextDetector(
            model_name,
            device=device,
            preprocessing=True,
            max_len=max_len,
        )
        self.max_len = max_len

    def __call__(self, chunks):
        scores = [s[0][-1] for s in [self.detector.detect(c) for c in chunks]]
        # score = self.detector.detect(text)
        labels = ["Fake" if score >= 0.5 else "Real" for score in scores]
        return [{"label": label, "score": score} for label, score in zip(labels, scores)]
    
pipe = DetectorWrapper()

In [None]:
def ai_detection(text: str, pipe: callable, size: int =256, approach: str = "word", vis: bool =False, verbose: bool =False):
    """Full pipeline: preprocess → split → infer → aggregate → visualize."""
    cleaned = unmark_element(text)
    
    if approach == "sentence":
        chunks = list(single_sentence_chunks(cleaned, size=size))
    elif approach == "words":
        chunks = list(sliding_window_words(cleaned, size=size))
    else:
        raise ValueError("Invalid approach. Choose from 'sentence' or 'words'.")

    if verbose:
        print(f"Text length: {len(cleaned)} characters.")
        print(f"Text split into {len(chunks)} chunks.")

    results = pipe(chunks)
    scores = aggregate_scores(cleaned, chunks, results, "window" in approach)

    avg_score = np.nanmean(scores[:, 0])
    
    if vis:
        visualize_scores(cleaned, scores)
    return scores, avg_score


# --- 7. Get scores and average ---
def get_ai_scores(text, pipe, vis=False):
    text = unmark_element(text)
    word_approach_window, word_approach_window_avg = ai_detection(text, pipe, size=94, approach="words", vis=vis) # Average word length ~4.7, average sentence length ~20 words --> 4.7 * 20 = 94
    sentence_approach, sentence_approach_avg = ai_detection(text, pipe, size=256, approach="sentence", vis=vis)
    average_ai_score = (word_approach_window_avg + sentence_approach_avg) / 2
    average_approach = (word_approach_window + sentence_approach) / 2
    return word_approach_window, sentence_approach, average_approach, average_ai_score

### Test one case (for report)

In [None]:
df = pd.read_sql("SELECT * FROM medium_articles", engine)
random_text = df[df['full_article_text'].str.len() < 2000]['full_article_text'].sample().iloc[0]

vis = False
wa, sa, avg_approach, average_ai_score = get_ai_scores(random_text, pipe, vis=vis)

print("Average AI Score:", average_ai_score)
_ = visualize_scores(random_text, avg_approach)

### Run AI Detection

In [None]:
sample_size = min(len(df[df['is_free'] == True]), len(df[df['is_free'] == False]))

free_articles = df[df['is_free'] == True].sample(sample_size, random_state=42)
paid_articles = df[df['is_free'] == False].sample(sample_size, random_state=42)
balanced_df = pd.concat([free_articles, paid_articles]).reset_index(drop=True)

balanced_df['word_approach_scores'] = None
balanced_df['sentence_approach_scores'] = None
balanced_df['average_approach_scores'] = None
balanced_df['average_ai_score'] = np.nan

for i, row in tqdm(balanced_df.iterrows(), total=balanced_df.shape[0]):
    chunks = row['full_article_text']
    if pd.isna(chunks) or len(chunks.strip()) == 0:
        balanced_df.at[i, 'word_approach_scores'] = (np.array([]), np.nan, np.array([]), np.nan)
        balanced_df.at[i, 'sentence_approach_scores'] = (np.array([]), np.nan, np.array([]), np.nan)
        balanced_df.at[i, 'average_approach_scores'] = (np.array([]), np.nan, np.array([]), np.nan)
        balanced_df.at[i, 'average_ai_score'] = np.nan
        print("Empty text, skipping.")
        continue
    try:
        wa, sa, avg_approach, average_ai_score = get_ai_scores(chunks, pipe, vis=False)
        balanced_df.at[i, 'word_approach_scores'] = wa.tolist()
        balanced_df.at[i, 'sentence_approach_scores'] = sa.tolist()
        balanced_df.at[i, 'average_approach_scores'] = avg_approach.tolist()
        balanced_df.at[i, 'average_ai_score'] = average_ai_score
    except Exception as e:
        print(f"Error processing article {i+1}: {e}")

# save df as pickle
balanced_df.to_pickle(".\grammar_analysis\medium_articles_with_ai_scores.pkl")

### Analysis

In [None]:
balanced_df = pd.read_pickle(".\grammar_analysis\medium_articles_with_ai_scores.pkl")

In [None]:
free_df = balanced_df[balanced_df['is_free'] == True]
paid_df = balanced_df[balanced_df['is_free'] == False]

free_scores = free_df['average_ai_score'].dropna()
paid_scores = paid_df['average_ai_score'].dropna()

# set sample size to the smaller of the two groups
min_size = min(len(free_scores), len(paid_scores))
free_scores = free_scores.sample(min_size, random_state=42)
paid_scores = paid_scores.sample(min_size, random_state=42)

t_stat, p_value = ttest_ind(free_scores, paid_scores, equal_var=False)
print(f"\nT-test between Free and Paid articles:")
print(f"T-statistic: {t_stat:.4f}, P-value: {p_value:.4e}")

In [None]:
# make sure we have independent DataFrames
free_articles_df = free_df.copy()
paid_articles_df = paid_df.copy()

# filter out old articles
cutoff_date = datetime.datetime(2012, 1, 1)
free_articles_df = free_articles_df[free_articles_df["date_published"] > cutoff_date]
paid_articles_df = paid_articles_df[paid_articles_df["date_published"] > cutoff_date]

# extract year-month
free_articles_df.loc[:, 'year_month'] = pd.to_datetime(free_articles_df['date_published']).dt.to_period('M')
paid_articles_df.loc[:, 'year_month'] = pd.to_datetime(paid_articles_df['date_published']).dt.to_period('M')

# group by month and compute average AI score
free_monthly_ai = free_articles_df.groupby('year_month')['average_ai_score'].mean()
paid_monthly_ai = paid_articles_df.groupby('year_month')['average_ai_score'].mean()

# convert to DataFrames
free_data = pd.DataFrame({
    'month': free_monthly_ai.index.to_timestamp(),
    'avg_ai_score': free_monthly_ai.values,
    'type': 'Free Articles'
})
paid_data = pd.DataFrame({
    'month': paid_monthly_ai.index.to_timestamp(),
    'avg_ai_score': paid_monthly_ai.values,
    'type': 'Paid Articles'
})
combined_data = pd.concat([free_data, paid_data])

window = 12

# compute rolling averages for trend lines (6-month window)
combined_data['trend'] = combined_data.groupby('type')['avg_ai_score'].transform(
    lambda x: x.rolling(window=window, min_periods=1).mean()
)

# plot main lines
plt.figure(figsize=(12, 6))
datapoints = sns.lineplot(
    data=combined_data,
    x='month',
    y='avg_ai_score',
    hue='type',
    style='type',
    marker='o',
    linewidth=1.5,
    alpha=0.5
)

for line in datapoints.lines:
    lbl = line.get_label()
    if "trend" in lbl:
        continue
    line.set_label(f"{lbl} (data points)")

# plot trend lines (thicker and darker)
trend = sns.lineplot(
    data=combined_data,
    x='month',
    y='trend',
    hue='type',
    linewidth=3
)

for line in trend.lines:
    lbl = line.get_label()
    if "data points" in lbl:
        continue
    line.set_label(f"{lbl} (trend over {window} months)")

plt.legend(title='Article Type')

plt.title('Average AI Score Over Time (Free vs Paid Articles)')
plt.xlabel('Month')
plt.ylabel('Average AI Score')
sns.despine()
plt.grid(True)
plt.tight_layout()

plt.savefig('average_ai_score_over_time_with_trend.png', dpi=300)
plt.show()


In [None]:
# assign half-year as string label
def to_half_year(date):
    year = date.year
    half = 1 if date.month <= 6 else 2
    return f"{year}-H{half}"

free_articles_df['half_year'] = pd.to_datetime(free_articles_df['date_published']).apply(to_half_year)
paid_articles_df['half_year'] = pd.to_datetime(paid_articles_df['date_published']).apply(to_half_year)

# combine into one DataFrame
combined_data_half = pd.concat([
    free_articles_df.assign(type='Free Articles'),
    paid_articles_df.assign(type='Paid Articles')
])

# filter by cutoff
combined_data_half = combined_data_half[
    pd.to_datetime(combined_data_half['date_published']) >= pd.Timestamp('2017-01-01')
]

# sort by time order for prettier axis
half_order = sorted(combined_data_half['half_year'].unique())

# plot boxplot
plt.figure(figsize=(12, 7))
sns.boxplot(
    data=combined_data_half,
    x='half_year',
    y='average_ai_score',
    hue='type',
    order=half_order
)
plt.title('AI Score Distribution Over Half-Year Periods (Free vs Paid Articles)')
plt.xlabel('Half-Year Period')
plt.ylabel('AI Score')
plt.xticks(rotation=45)
plt.legend(title='Article Type', loc='upper left')
sns.despine()
plt.grid(True)
plt.tight_layout()
plt.savefig('ai_score_boxplot_half_year.png', dpi=300)

plt.show()