In [None]:
import pandas as pd
import json
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import re
import torch
from transformers import GPT2LMHeadModel, GPT2TokenizerFast
from collections import Counter
from tqdm import tqdm
import warnings
import os
import fasttext
import time
from langdetect import detect
try:
    from detoxify import Detoxify
except ImportError:
    !pip install detoxify
    from detoxify import Detoxify



# Configuration & Data load

In [None]:
INPUT_PATH = '../data/raw/mainpipe_data_v1.jsonl'
OUTPUT_PATH = '../data/processed/cleaned_dataset.jsonl'
SAMPLE_SIZE = 1000 # Number of samples for expensive checks
PLOTS_DIR = '../reports/plots/'
LANG_MODEL_PATH = '../data/lid.176.bin'

warnings.filterwarnings('ignore')

In [None]:
# Load Data
if os.path.exists(INPUT_PATH):
    print(f"Loading dataset from {INPUT_PATH}...")
    df_in = pd.read_json(INPUT_PATH, lines=True)
    print(f"Loaded {len(df_in):,} documents.")
else:
    print(f"ERROR: File not found at {INPUT_PATH}. Please ensure you have downloaded the dataset.")

if os.path.exists(OUTPUT_PATH):
    print(f"Loading dataset from {OUTPUT_PATH}...")
    df_out = pd.read_json(OUTPUT_PATH, lines=True)
    print(f"Loaded {len(df_out):,} documents.")
else:
    print(f"ERROR: File not found at {OUTPUT_PATH}. Please ensure the pipeline has been run successfully.")
    # Creating an empty DataFrame to allow the rest of the script to run without crashing, for demonstration purposes
    df_out = pd.DataFrame({'doc_id': [], 'text': [], 'char_count': [], 'word_count': []})

## 1. Pipeline steps

In [None]:
# Values from count report (after running the pipeline):
total_input = len(df_in)
after_ingest = 201457
after_filter = 160062
after_dedup = len(df_out)

plt.figure(figsize=(10, 6))

categories = ['Input\nRecords', 'After\nIngestion', 'After\nFiltering', 'After\nDeduplication']
values = [total_input, after_ingest, after_filter, after_dedup]
colors_bar = ['Orange', 'Red', 'Blue', 'Green']
bars = plt.bar(categories, values, color=colors_bar, edgecolor='black', width=0.6)
plt.ylabel('Count')
        #plt.grid(True, alpha=0.3, axis='y')
for bar in bars:
    height = bar.get_height()
    percentage = (height / total_input * 100) if total_input > 0 else 0
    if percentage < 100:
        plt.text(bar.get_x() + bar.get_width()/2., height, f'{int(height):,}\n({percentage:.1f}%)', ha='center', va='bottom', fontweight='bold', fontsize=14)
    else:
        plt.text(bar.get_x() + bar.get_width()/2., height, f'{int(height):,}', ha='center', va='bottom', fontweight='bold', fontsize=14)
plt.tight_layout()
plt.savefig(os.path.join(PLOTS_DIR, 'pipeline_steps.png'), bbox_inches='tight')
print("Saved: pipeline_steps.png")
plt.show()

## 2. Histogram of lengths

In [None]:
def wordschar_count(df):
    if not df.empty:
        if 'char_count' not in df.columns:
            df['char_length'] = df['text'].str.len()
        else:
            df['char_length'] = df['char_count']
        if 'word_count' not in df.columns:
            df['word_count'] = df['text'].str.split().str.len()
        return (df['char_length'], df['word_count'])

In [None]:
# We analyze document lengths and check for HTML artifacts or non-printable characters.

char_in, words_in = wordschar_count(df_in)
char_out, words_out = wordschar_count(df_out)

print(f"Mean Length (input): {char_in.mean():.0f} chars")
print(f"Min Length (input): {char_in.min()} chars")
print(f"Max Length (input): {char_in.max()} chars")
print(f"Mean Length (output): {char_out.mean():.0f} chars")
print(f"Min Length (output): {char_out.min()} chars")
print(f"Max Length (output): {char_out.max()} chars")

fig, ax = plt.subplots(2, 2, figsize=(10, 10))

ax[0,0].hist(char_in, bins=np.logspace(1,5,25), edgecolor = 'k',
             color='Orange')
ax[0,0].axvline(np.mean(char_in), color = 'Red', ls = '--', lw=2, label = f'Avg. input = {char_in.mean():.0f}')
ax[0,0].axvline(np.mean(char_out), color = 'Blue', ls = '--', lw=2, label = f'Avg. output = {char_out.mean():.0f}')
ax[0,0].set_title("Character Length Distribution")
ax[0,0].set_xlim([1,1e5])
ax[0,0].set_ylabel('Count')
ax[0,0].set_xscale('log')
ax[0,0].legend(frameon=False)

ax[0,1].hist(words_in, bins=np.logspace(1,4,25), edgecolor = 'k',
             color='Orange')
ax[0,1].axvline(np.mean(words_in), color = 'Red', ls = '--', lw=2, label = f'Avg. input = {words_in.mean():.0f}')
ax[0,1].axvline(np.mean(words_out), color = 'Blue', ls = '--', lw=2, label = f'Avg. output = {words_out.mean():.0f}')
ax[0,1].set_title("Words Length Distribution")
ax[0,1].legend(frameon=False)
ax[0,1].set_xlim([1,1e4])
ax[0,1].set_xscale('log')

ax[1,0].hist(char_out, bins=np.logspace(1,5,25), edgecolor = 'k',
             color='Green')
ax[1,0].axvline(np.mean(char_in), color = 'Red', ls = '--', lw=2, label = f'Avg. input = {char_in.mean():.0f}')
ax[1,0].axvline(np.mean(char_out), color = 'Blue', ls = '--', lw=2, label = f'Avg. output = {char_out.mean():.0f}')
ax[1,0].legend(frameon=False)
ax[1,0].set_xlim([1,1e5])
ax[1,0].set_xscale('log')
ax[1,0].set_ylabel('Count')
ax[1,0].set_xlabel('Character length')

ax[1,1].hist(words_out, bins=np.logspace(1,4,25), edgecolor = 'k',
             color='Green')
ax[1,1].axvline(np.mean(words_in), color = 'Red', ls = '--', lw=2, label = f'Avg. input = {words_in.mean():.0f}')
ax[1,1].axvline(np.mean(words_out), color = 'Blue', ls = '--', lw=2, label = f'Avg. output = {words_out.mean():.0f}')
ax[1,1].legend(frameon=False)
ax[1,1].set_xlim([1,1e4])
ax[1,1].set_xscale('log')
ax[1,1].set_xlabel('Word length')

plt.tight_layout()
plt.savefig(os.path.join(PLOTS_DIR, 'Char_Words_Dist.png'), bbox_inches='tight')
print("Saved: Char_Words_Dist.png")
plt.show()

## 3. Integrity checks (Boilerplate & artifacts)

In [None]:
# Regex for common HTML tags left over 
html_re = re.compile(r'<[^>]+>')
# Regex for excessive whitespace 
space_re = re.compile(r'\s{4,}')
code_re = re.compile(
            r'\b(function|var|const|let|=>|console\.log|document\.getElementById|'
            r'window\.|return|import|export|class|def|printf|iostream)\b'
        )

#sample_check = df.sample(max(len(df), 5000))

html_hits_in = df_in['text'].apply(lambda x: bool(html_re.search(x))).sum()
code_hits_in = df_in['text'].apply(lambda x: bool(code_re.search(x))).sum()
space_hits_in = df_in['text'].apply(lambda x: bool(space_re.search(x))).sum()
code_hits_out = df_out['text'].apply(lambda x: bool(code_re.search(x))).sum()
html_hits_out = df_out['text'].apply(lambda x: bool(html_re.search(x))).sum()
space_hits_out = df_out['text'].apply(lambda x: bool(space_re.search(x))).sum()

print("\nIntegrity Scan")
print(f"- Documents with HTML artifacts (input): {html_hits_in} ({html_hits_in/len(df_in):.1%})")
print(f"- Documents with code (input): {code_hits_in} ({code_hits_in/len(df_in):.1%})")
print(f"- Documents with excessive whitespace (input): {space_hits_in} ({space_hits_in/len(df_in):.1%})")
print(f"- Documents with HTML artifacts (output): {html_hits_out} ({html_hits_out/len(df_out):.1%})")
print(f"- Documents with code (output): {code_hits_out} ({code_hits_out/len(df_out):.1%})")
print(f"- Documents with excessive whitespace (output): {space_hits_out} ({space_hits_out/len(df_out):.1%})")

In [None]:
categories = ["HTML", "Code", "Whitespace"]

in_values = [html_hits_in, code_hits_in, space_hits_in]
out_values = [html_hits_out, code_hits_out, space_hits_out]
percentage_in = [v / len(df_in) * 100 for v in in_values]
percentage_out = [v / len(df_out) * 100 for v in out_values]

x = np.arange(len(categories))      # positions for categories
width = 0.35                        # bar width

fig, ax = plt.subplots(figsize=(10, 6))


bars_in = ax.bar(x - width/2, in_values, width, color = 'orange', label='Input')
bars_out = ax.bar(x + width/2, out_values, width, color = 'green', label='Output')
for bar, pct in zip(bars_in, percentage_in):
    height = bar.get_height()
    ax.text(bar.get_x() + bar.get_width()/2, height, f"{pct:.1f}%", ha='center', va='bottom', fontsize=10)
for bar, pct in zip(bars_out, percentage_out):
    height = bar.get_height()
    ax.text(bar.get_x() + bar.get_width()/2, height, f"{pct:.1f}%", ha='center', va='bottom', fontsize=10)
ax.set_xticks(x)
ax.set_xticklabels(categories)
ax.set_ylabel("Count")
ax.set_title("Integrity Check Artifacts")
ax.legend()

plt.tight_layout()
plt.savefig(os.path.join(PLOTS_DIR, 'Integrity_Check.png'), bbox_inches='tight')
print("Saved: Integrity_Check.png")

## 4a. Safety: PII check

In [None]:
# We verify that PII has been redacted (checking for `<EMAIL>` tags vs real emails) and perform a heuristic scan for toxicity.

def check_PII(df):
    if not df.empty:
        redaction_tag_re = re.compile(r'<EMAIL>|<IP>|<PHONE>')
        email_leak_re = re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b')
        ip_leak_re = re.compile(r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b')
        phone_leak_re = re.compile(r'\b(?:\+\d{1,2}\s)?\(?\d{3}\)?[\s.-]?\d{3}[\s.-]?\d{4}\b')

        redaction_counts = df['text'].str.count(redaction_tag_re).sum()

        e_leaks = df['text'].apply(lambda x: len(email_leak_re.findall(x)))
        ip_leaks = df['text'].apply(lambda x: len(ip_leak_re.findall(x)))
        phone_leaks = df['text'].apply(lambda x: len(phone_leak_re.findall(x)))
        total_eleaks = e_leaks.sum()
        total_ipleaks = ip_leaks.sum()
        total_phoneleaks = phone_leaks.sum()

        print("\nPII Safety Analysis:")
        print(f"- Total Redaction Tags Found (<EMAIL>, etc): {redaction_counts:,}")
        print(f"- Potential Email Leaks found: {total_eleaks:,}")
        print(f"- Potential IP Leaks found: {total_ipleaks:,}")
        print(f"- Potential Phone Leaks found: {total_phoneleaks:,}")
        print(f"- Documents with email leaks: {len(df[e_leaks > 0]):,}")
        print(f"- Documents with ip leaks: {len(df[ip_leaks > 0]):,}")
        print(f"- Documents with phone leaks: {len(df[phone_leaks > 0]):,}")
        
        # Inspect a redaction example
        email_redacted_sample = df[df['text'].str.contains("<EMAIL>", na=False)].head(1)
        phone_redacted_sample = df[df['text'].str.contains("<PHONE>", na=False)].head(1)
        ip_redacted_sample = df[df['text'].str.contains("<IP>", na=False)].head(1)
        if not email_redacted_sample.empty:
            print("\n--- Redaction Example ---")
            print(email_redacted_sample.iloc[0]['text'][:300] + "...")
        if not phone_redacted_sample.empty:
            print("\n--- Redaction Example ---")
            print(phone_redacted_sample.iloc[0]['text'][:300] + "...")
        if not ip_redacted_sample.empty:
            print("\n--- Redaction Example ---")
            print(ip_redacted_sample.iloc[0]['text'][:300] + "...")
        return redaction_counts, total_eleaks, total_ipleaks, total_phoneleaks

In [None]:
red_in, eleaks_in, ipleaks_in, phoneleaks_in = check_PII(df_in)
red_out, eleaks_out, ipleaks_out, phoneleaks_out = check_PII(df_out)

categories = ["Email", "Phone", "IP"]

PII_in = [eleaks_in, ipleaks_in, phoneleaks_in]
PII_out = [eleaks_out, ipleaks_out, phoneleaks_out]
PII_perc_in = [v / len(df_in) * 100 for v in PII_in]
PII_perc_out = [v / len(df_out) * 100 for v in PII_out]

x = np.arange(len(categories))      # positions for categories
width = 0.35                        # bar width

fig, ax = plt.subplots(figsize=(10, 6))


bars_in = ax.bar(x - width/2, PII_in, width, color = 'orange', label='Input')
bars_out = ax.bar(x + width/2, PII_out, width, color = 'green', label='Output')
for bar, pct in zip(bars_in, PII_perc_in):
    height = bar.get_height()
    ax.text(bar.get_x() + bar.get_width()/2, height, f"{pct:.1f}%", ha='center', va='bottom', fontsize=10)
for bar, pct in zip(bars_out, PII_perc_out):
    height = bar.get_height()
    ax.text(bar.get_x() + bar.get_width()/2, height, f"{pct:.1f}%", ha='center', va='bottom', fontsize=10)
ax.set_xticks(x)
ax.set_xticklabels(categories)
ax.set_ylabel("Count")
ax.set_title(f"PII Check (Output file has {red_out:,} redaction tags)")
ax.legend()

plt.tight_layout()
plt.savefig(os.path.join(PLOTS_DIR, 'PII_Check.png'), bbox_inches='tight')
print("Saved: PII_Check.png")

## 4b. Safety: Toxicity check

In [None]:
def count_toxic_docs(df, key="text", threshold=0.5, batch_size=16, sample=None):
    """
    Returns the number of toxic documents per category.
    Light enough to run on a personal laptop.
    """

    detox = Detoxify("original")

    # Optional sampling for speed
    if sample:
        df = df.sample(sample, random_state=42).reset_index(drop=True)

    texts = df[key].fillna("").tolist()

    categories = ["toxicity", "severe_toxicity", "obscene",
                  "identity_attack", "insult", "threat"]

    # Counter for toxic docs per category
    toxic_counts = {cat: 0 for cat in categories}

    # Process in small batches
    for i in tqdm(range(0, len(texts), batch_size), desc="Scanning toxicity"):
        batch = texts[i:i+batch_size]
        batch_scores = detox.predict(batch)

        # Count doc if > threshold
        for cat in categories:
            for score in batch_scores[cat]:
                if score > threshold:
                    toxic_counts[cat] += 1

    return toxic_counts

In [None]:
toxic_docs_in = count_toxic_docs(df_in, sample=SAMPLE_SIZE)
toxic_docs_out = count_toxic_docs(df_out, sample=SAMPLE_SIZE)

categories = list(toxic_docs_in.keys())

x = np.arange(len(categories))         
width = 0.35                       

fig, ax = plt.subplots(figsize=(10, 6))

bars_in = ax.bar(x - width/2, [toxic_docs_in[c] for c in categories],
                 width, label='Input', color = 'Orange')

bars_out = ax.bar(x + width/2, [toxic_docs_out[c] for c in categories],
                  width, label='Output', color = 'Green')

ax.set_ylabel("Count")
ax.set_title(f'Toxic Documents (Using Sample size = {SAMPLE_SIZE:.0f})')
ax.set_xticks(x)
ax.set_xticklabels(categories, rotation=45, ha="right")
ax.legend()

for bar, cat in zip(bars_in, categories):
    height = bar.get_height()
    pct = 100 * toxic_docs_in[cat] / SAMPLE_SIZE 
    ax.text(bar.get_x() + bar.get_width()/2, height,
            f"{pct:.1f}%", ha="center", va="bottom", fontsize=10)

for bar, cat in zip(bars_out, categories):
    height = bar.get_height()
    pct = 100 * toxic_docs_out[cat] / SAMPLE_SIZE 
    ax.text(bar.get_x() + bar.get_width()/2, height,
            f"{pct:.1f}%", ha="center", va="bottom", fontsize=10)

plt.tight_layout()
plt.savefig(os.path.join(PLOTS_DIR, 'Toxicity_Check.png'), bbox_inches='tight')
print("Saved: Toxicity_Check.png")
plt.show()

## 5. Language distribution

In [None]:
if not os.path.exists(LANG_MODEL_PATH):
    print("\nDownloading FastText model for verification...")
    try:
        os.system(f"wget https://dl.fbaipublicfiles.com/fasttext/supervised-models/lid.176.bin -O {LANG_MODEL_PATH}")
    except Exception as e:
        print(f"Could not download FastText model automatically: {e}")
        print("Please download it manually from the URL above.")

if os.path.exists(LANG_MODEL_PATH):
    ft_model = fasttext.load_model(LANG_MODEL_PATH)

In [None]:
def predict_lang(text):
    text = text.replace("\n", " ")[:1000]
    # FastText requires a list for predict
    res = ft_model.predict([text])
    return res[0][0][0]

def english_other_counts(lang_series):
    total = len(lang_series)
    english = (lang_series == "en").sum()
    other = total - english
    return {"english": english, "other": other, "total": total}

In [None]:
print("\nVerifying language coverage (input)...")
sample_lang_in = df_in.copy()
sample_lang_in['lang'] = df_in['text'].apply(predict_lang)
sample_lang_in['lang'] = sample_lang_in['lang'].str.replace('__label__', '')
lang_dist_in = sample_lang_in['lang'].value_counts()
print(lang_dist_in)
print("\nVerifying language coverage (output)...")
sample_lang_out = df_out.copy()
sample_lang_out['lang'] = df_out['text'].apply(predict_lang)
sample_lang_out['lang'] = sample_lang_out['lang'].str.replace('__label__', '')
lang_dist_out = sample_lang_out['lang'].value_counts()
print(lang_dist_out)

In [None]:
dist_in = english_other_counts(sample_lang_in["lang"])
dist_out = english_other_counts(sample_lang_out["lang"])

categories = ["english", "other"]
x = np.arange(len(categories))
width = 0.35

fig, ax = plt.subplots(figsize=(10, 6))

bars_in = ax.bar(x - width/2, [dist_in[c] for c in categories],
                 width, label="Input", color = 'Orange')

bars_out = ax.bar(x + width/2, [dist_out[c] for c in categories],
                  width, label="Output", color = 'Green')

ax.set_ylabel("Count")
ax.set_title('Language Distribution')
ax.set_xticks(x)
ax.set_xticklabels(["English", "Other"], fontsize=12)
ax.legend()

for bar, cat in zip(bars_in, categories):
    height = bar.get_height()
    pct = 100 * dist_in[cat] / dist_in["total"]
    ax.text(bar.get_x() + bar.get_width()/2, height,
            f"{pct:.1f}%", ha="center", va="bottom")

for bar, cat in zip(bars_out, categories):
    height = bar.get_height()
    pct = 100 * dist_out[cat] / dist_out["total"]
    ax.text(bar.get_x() + bar.get_width()/2, height,
            f"{pct:.1f}%", ha="center", va="bottom")

plt.tight_layout()
plt.savefig(os.path.join(PLOTS_DIR, 'Language_Check.png'), bbox_inches='tight')
print("Saved: Language_Check.png")
plt.show()

## 6. Linguistic (Perplexity)

In [None]:
def calculate_ppl_batch(texts, model, tokenizer, device, stride=512):
    ppls = []

    for text in texts:
        encodings = tokenizer(text, return_tensors="pt")
        input_ids = encodings.input_ids.to(device)
        seq_len = input_ids.size(1)
        max_length = model.config.n_positions

        nlls = []
        prev_end_loc = 0

        # Sliding window over the sequence
        for begin_loc in range(0, seq_len, stride):
            end_loc = min(begin_loc + max_length, seq_len)
            trg_len = end_loc - prev_end_loc

            inp = input_ids[:, begin_loc:end_loc]
            target_ids = inp.clone()
            target_ids[:, :-trg_len] = -100   # mask everything except last block

            with torch.no_grad():
                outputs = model(inp, labels=target_ids)
                nlls.append(outputs.loss)

            prev_end_loc = end_loc
            if end_loc >= seq_len:
                break

        if len(nlls) == 0:
            ppls.append(float('nan'))
        else:
            ppl = torch.exp(torch.stack(nlls).mean()).item()
            ppls.append(ppl)

    return ppls


def compute_dataset_perplexity(df, text_column="text", 
                               model_id="distilgpt2", 
                               sample_size=SAMPLE_SIZE,
                               batch_size=8):
    device = "cuda" if torch.cuda.is_available() else "cpu"
    print(f"Loading {model_id} on {device}...")

    tokenizer = GPT2TokenizerFast.from_pretrained(model_id)
    model = GPT2LMHeadModel.from_pretrained(model_id).to(device)
    model.eval()

    # Sample subset
    sample = df[text_column].sample(min(len(df), sample_size)).tolist()

    print(f"Computing perplexity on {len(sample)} docs (batch size={batch_size})...")

    all_ppls = []
    for i in tqdm(range(0, len(sample), batch_size)):
        batch = sample[i:i+batch_size]
        batch_ppls = calculate_ppl_batch(batch, model, tokenizer, device)
        all_ppls.extend(batch_ppls)

    return all_ppls

In [None]:
ppls_in = compute_dataset_perplexity(df_in)
ppls_out = compute_dataset_perplexity(df_out)

ppl_mean_in = np.mean(ppls_in)
ppl_mean_out = np.mean(ppls_out)
ppl_median_in = np.median(ppls_in)
ppl_median_out = np.median(ppls_out)

In [None]:
fig, ax = plt.subplots(2, 1, figsize=(10, 6))

ax[0].hist(ppls_in, color='orange', bins = np.linspace(0, 500, 50))
ax[1].hist(ppls_out, color='green', bins = np.linspace(0, 500, 50))
ax[0].axvline(ppl_mean_in, color = 'k', ls = '--', lw = 2, label = f'Avg = {ppl_mean_in:.1f}')
ax[0].axvline(ppl_median_in, color = 'r', ls = '--', lw = 2, label = f'Median = {ppl_median_in:.1f}')
ax[1].axvline(ppl_mean_out, color = 'k', ls = '--', lw = 2, label = f'Avg = {ppl_mean_out:.1f}')
ax[1].axvline(ppl_median_out, color = 'r', ls = '--', lw = 2, label = f'Median = {ppl_median_out:.1f}')
ax[0].set_title(f"Perplexity Distribution (Using sample size = {SAMPLE_SIZE:.0f})")
ax[1].set_xlabel("Perplexity")
ax[0].set_ylabel("Count")
ax[1].set_ylabel("Count")
ax[0].legend(frameon=False)
ax[1].legend(frameon=False)
plt.tight_layout()
plt.savefig(os.path.join(PLOTS_DIR, 'Perplexity_Dist.png'), bbox_inches='tight')
print("Saved: Perplexity_Dist.png")
plt.show()

## 7. Deduplication check

In [None]:
total_docs = len(df_out)
unique_ids = df_out['doc_id'].nunique()
unique_texts = df_out['text'].nunique()
duplicate_texts = total_docs - unique_texts

plt.figure(figsize=(10, 6))

categories = ['Unique', 'Duplicate']
values = [unique_ids, duplicate_texts]
colors_bar = ['Blue', 'Red']
bars = plt.bar(categories, values, color=colors_bar, edgecolor='black', width=0.6)
plt.ylabel('Count')
plt.yscale('log')
for bar in bars:
    height = bar.get_height()
    percentage = (height / total_docs * 100) if total_docs > 0 else 0
    plt.text(bar.get_x() + bar.get_width()/2., height, f'{int(height):,}\n({percentage:.2f}%)', ha='center', va='bottom', fontweight='bold', fontsize=14)
plt.tight_layout()
plt.savefig(os.path.join(PLOTS_DIR, 'Dedup_Check.png'), bbox_inches='tight')
print("Saved: Dedup_Check.png")
plt.show()