# <b>Content ingestion pipeline</b>

### Document sources:
- CSV files of public articles and press releases on multilingual humantarian sites.
- These data sources often contain multilingual content with mixed formats (HTML, PDF, Plain text, structured data)

******

### <b>Data preparation and text-processing pipeline

In [1]:
import time
nb_start = time.time()

In [2]:
# If you see "tokenizers parallelism" warnings from other libs, silence them:
import os
os.environ["TOKENIZERS_PARALLELISM"] = "false"

#### 1. Import required libraries.

In [3]:
import os
import re
import json
import unicodedata
from pathlib import Path
from typing import List, Dict

import pandas as pd
from tqdm.notebook import tqdm
from bs4 import BeautifulSoup
import ftfy
import langid
from blingfire import text_to_sentences, text_to_words

# Optional tokenizers (used if available)
try:
    import jieba  # zh
except Exception:
    jieba = None

try:
    from fugashi import Tagger  # ja
    _ja_tagger = Tagger()
except Exception:
    _ja_tagger = None

try:
    import pythainlp.tokenize as thai_tok  # th
except Exception:
    thai_tok = None

In [4]:
# ---- Configure your paths here ----

# folder containing many *.csv like site_a.csv, site_b.csv
RAW_DOCS    = Path("../dataset/raw_docs")                 

# processed file
OUT_PARQ  = Path("../shared-data-library/out/df_processed.parquet")

# e.g., Path("../shared-data-library/df_processed.parquet") if you also want JSONL
OUT_JSONL = None

# 2k–10k works well; adjust to memory/CPU
BATCH_SIZE = 5000

In [5]:
# Pre-compiled regexes
URL_RE = re.compile(r'https?://\S+|www\.\S+', flags=re.IGNORECASE)
CTRL_RE = re.compile(r'[\u0000-\u001f\u007f\u200b\u200c\u200d]')  # control + zero-width chars
MULTISPACE_RE = re.compile(r'\s+')

#### Text preprocessing functions

Text normalisation.

In [6]:
def normalize_text(text: str) -> str:
    """Unicode-safe normalization & light cleaning that respects multilingual scripts."""
    if not isinstance(text, str):
        return ""
    s = ftfy.fix_text(text)                           # fix mojibake
    s = unicodedata.normalize("NFC", s)               # compose accents consistently
    s = BeautifulSoup(s, "html.parser").get_text(" ") # strip HTML safely
    s = URL_RE.sub(" <URL> ", s)                      # keep a URL placeholder (useful for boundaries)
    s = CTRL_RE.sub(" ", s)                           # drop control / zero-width chars
    s = MULTISPACE_RE.sub(" ", s).strip()
    # Intentionally NOT lowercasing (Turkish I/ı, German ß, proper nouns)
    return s

Language detection

In [7]:
def detect_lang(text: str) -> str:
    if not text:
        return "unk"
    code, _ = langid.classify(text)
    return code

Sentence tokenization.

In [8]:
def sent_tokenize(text: str) -> List[str]:
    if not text:
        return []
    sents = text_to_sentences(text).split("\n")
    return [s.strip() for s in sents if s.strip()]

Word tokenization

In [9]:
def word_tokenize(text: str, lang: str) -> List[str]:
    if not text:
        return []
    lang = (lang or "").split("_")[0]
    if lang == "zh" and jieba is not None:
        return [t.strip() for t in jieba.cut(text) if t.strip()]
    if lang == "ja" and _ja_tagger is not None:
        return [w.surface for w in _ja_tagger(text) if w.surface.strip()]
    if lang == "th" and thai_tok is not None:
        return [t.strip() for t in thai_tok.word_tokenize(text) if t.strip()]
    # Default fast multilingual fallback
    return [w for w in text_to_words(text).split() if w]

Preprocess for single row.

In [10]:
def preprocess_row(row: Dict) -> Dict:
    raw = row.get("text", "")
    clean = normalize_text(raw)
    lang = detect_lang(clean) if clean else "unk"
    sents = sent_tokenize(clean)
    tokens = word_tokenize(clean, lang)
    return {
        "site": row.get("_site", ""),
        "title": row.get("title", ""),
        "text": raw,
        "clean_text": clean,
        "lang": lang,
        "sentences": sents,
        "tokens": tokens,
        "n_sentences": len(sents),
        "n_tokens": len(tokens),
    }

#### Load CSV files.

In [11]:
def load_csv_files(in_dir: Path) -> pd.DataFrame:
    paths = sorted(in_dir.glob("*.csv"))
    if not paths:
        raise FileNotFoundError(f"No CSV files found in: {in_dir.resolve()}")

    frames = []
    for p in tqdm(paths, desc="Loading CSVs"):
        site = p.stem  # from "site_name.csv"
        try:
            df = pd.read_csv(
                p,
                encoding="utf-8-sig",
                on_bad_lines="skip",
                dtype={"title": "string", "text": "string"},
                usecols=lambda c: c in ("title", "text"),
            )
        except Exception:
            # Fallback if columns/encodings are messy
            df = pd.read_csv(p, encoding="utf-8-sig", on_bad_lines="skip")
            for col in ("title", "text"):
                if col not in df.columns:
                    df[col] = pd.NA
            df = df[["title", "text"]]

        df["title"] = df["title"].astype("string")
        df["text"]  = df["text"].astype("string")
        df["_site"] = site
        frames.append(df)

    all_df = pd.concat(frames, ignore_index=True)
    # Drop rows where both title and text are empty
    mask_empty = all_df["title"].fillna("").str.strip().eq("") & all_df["text"].fillna("").str.strip().eq("")
    all_df = all_df[~mask_empty].reset_index(drop=True)
    return all_df


Remove duplicates.

In [12]:
def make_dedupe_key(s: str) -> str:
    """
    Create a canonical key for deduplication:
    - normalize with normalize_text (fix mojibake, NFC, strip HTML, keep <URL>, remove control chars)
    - collapse whitespace
    - casefold for robust, language-aware case-insensitivity
    """
    s = normalize_text(s if isinstance(s, str) else "")
    s = re.sub(r"\s+", " ", s).strip()
    return s.casefold()

In [13]:
def dedupe_by_text(df: pd.DataFrame):
    df = df.copy()
    # Build key on the raw text (so we can drop before heavy processing)
    df["__dedupe_key"] = df["text"].fillna("").map(make_dedupe_key)

    # Mark dupes; keep the first occurrence
    dup_mask = df["__dedupe_key"].duplicated(keep="first")
    n_dups = int(dup_mask.sum())

    df_dups   = df.loc[dup_mask, ["_site", "title", "text", "__dedupe_key"]].reset_index(drop=True)
    df_nodup  = df.loc[~dup_mask].drop(columns=["__dedupe_key"]).reset_index(drop=True)

    return df_nodup, df_dups, n_dups

#### Batch processing

In [14]:
def process_dataframe_batched(df: pd.DataFrame, batch_size: int = 5000) -> pd.DataFrame:
    df = df[["_site", "title", "text"]].copy()
    n = len(df)
    results = []

    for start in tqdm(range(0, n, batch_size), desc="Processing batches"):
        end = min(n, start + batch_size)
        records = df.iloc[start:end].to_dict(orient="records")
        out = [preprocess_row(r) for r in records]
        part = pd.DataFrame(out)
        part["lang"] = part["lang"].astype("category")
        results.append(part)

    processed = pd.concat(results, ignore_index=True)

    # If memory becomes an issue, consider these toggles:
    # processed["tokens"] = processed["tokens"].apply(lambda t: " ".join(t))  # store tokens as a single string
    # processed = processed.drop(columns=["sentences"])                        # drop sentence list
    return processed

In [15]:
# Load csv files.
df_raw = load_csv_files(RAW_DOCS)

Loading CSVs:   0%|          | 0/20 [00:00<?, ?it/s]

In [16]:
print(f"Loaded {len(df_raw):,} rows from {RAW_DOCS}")
df_raw.head(5)

Loaded 25,019 rows from ../dataset/raw_docs


Unnamed: 0,title,text,_site
0,Երբեք չէի պատկերացնի,<p>“I have never thought that I can do importa...,armenia__textcontent_article
1,Երբեք չէի պատկերացնի,<p>We spoke to Heghine for a long time and she...,armenia__textcontent_article
2,Never have I thought,<p>“I have never thought that I can do importa...,armenia__textcontent_article
3,Never have I thought,<p>We spoke to Heghine for a long time and she...,armenia__textcontent_article
4,Աղետներին պատրաստ դպրոց` սահմանամերձ գյուղում,<p>Տավուշի մարզի Ներքին Ծաղկավան գյուղի դպրոցի...,armenia__textcontent_article


Removed duplicates

In [17]:
# Remove duplicates.
df_nodup, df_dups, n_dups = dedupe_by_text(df_raw)
print(f"🔁 Removed {n_dups:,} duplicate texts. Remaining: {len(df_nodup):,} rows.")

🔁 Removed 3,174 duplicate texts. Remaining: 21,845 rows.


Stored removed records in a file.

In [18]:
# Save a log of removed duplicates for auditing
df_dups.to_parquet("../logs/list-of-removed-duplicates-from-raw-inputs.parquet", index=False)

Pass raw data to text processing pipeline.

In [19]:
df_processed = process_dataframe_batched(df_nodup, batch_size=BATCH_SIZE)

Processing batches:   0%|          | 0/5 [00:00<?, ?it/s]

Building prefix dict from the default dictionary ...
Dumping model to file cache /var/folders/6r/x82plb356ylfzf1n3t44k5_40000gn/T/jieba.cache
Loading model cost 0.636 seconds.
Prefix dict has been built successfully.


In [20]:
# Processed row.
print(f"Processed rows: {len(df_processed):,}")
df_processed.head(3)

Processed rows: 21,845


Unnamed: 0,site,title,text,clean_text,lang,sentences,tokens,n_sentences,n_tokens
0,armenia__textcontent_article,Երբեք չէի պատկերացնի,<p>“I have never thought that I can do importa...,"""I have never thought that I can do important ...",en,"[""I have never thought that I can do important...","["", I, have, never, thought, that, I, can, do,...",9,260
1,armenia__textcontent_article,Երբեք չէի պատկերացնի,<p>We spoke to Heghine for a long time and she...,We spoke to Heghine for a long time and she of...,en,[We spoke to Heghine for a long time and she o...,"[We, spoke, to, Heghine, for, a, long, time, a...",13,357
2,armenia__textcontent_article,Աղետներին պատրաստ դպրոց` սահմանամերձ գյուղում,<p>Տավուշի մարզի Ներքին Ծաղկավան գյուղի դպրոցի...,Տավուշի մարզի Ներքին Ծաղկավան գյուղի դպրոցի VI...,hy,[Տավուշի մարզի Ներքին Ծաղկավան գյուղի դպրոցի V...,"[Տավուշի, մարզի, Ներքին, Ծաղկավան, գյուղի, դպր...",1,96


Save processed data file to shared data library.

In [21]:
OUT_PARQ.parent.mkdir(parents=True, exist_ok=True)
df_processed.to_parquet(OUT_PARQ, index=False, compression="zstd")

if OUT_JSONL:
    OUT_JSONL.parent.mkdir(parents=True, exist_ok=True)
    with open(OUT_JSONL, "w", encoding="utf-8") as f:
        for rec in df_processed.to_dict(orient="records"):
            f.write(json.dumps(rec, ensure_ascii=False) + "\n")

OUT_PARQ, OUT_JSONL

(PosixPath('../shared-data-library/out/df_processed.parquet'), None)

In [22]:
print("⏱ Notebook executed in %.2f minutes" % ((time.time()-nb_start)/60))

⏱ Notebook executed in 5.57 minutes


*****