## 🧩 Data Preprocessing for Ingestion Simulation

In this stage, we preprocess the *All the News* dataset into structured, day-based partitions.  
The goal is to simulate a **real-world data ingestion pipeline** for downstream document storage in **Qdrant** (for vector retrieval) and **Neo4j** (for graph relationships).

### 🎯 Objectives
- **Normalize** textual data by removing excessive whitespace and line breaks to reduce storage bloat.  
- **Parse and validate** publication dates into a consistent datetime format.  
- **Partition** data efficiently by day to mimic incremental ingestion batches.  
- **Prepare** clean CSV outputs for each day to serve as ingestion-ready artifacts.

### ⚙️ Workflow
1. **Load & filter** only the relevant columns:  
   `date, year, month, day, author, title, article, url, section, publication`  
2. **Sanitize** text columns (`article`, `title`, `author`) by collapsing multi-line text into single lines.  
3. **Convert** date strings into proper datetime objects, extracting the date component.  
4. **Group and write** records into per-day CSV partitions under the `data/` directory.

Each output file represents a realistic **daily ingestion batch**, allowing us to later simulate:
- Vector embedding creation and storage in **Qdrant**
- Knowledge graph linking in **Neo4j**
- Continuous **RAG pipeline evaluation**

This modular preprocessing design mirrors how production pipelines would handle incoming text streams in real-world ML or data engineering systems.


In [24]:
import polars as pl
from kagglehub import kagglehub
from pathlib import Path

# --- Setup paths ---
# Download dataset
path = kagglehub.dataset_download("davidmckinley/all-the-news-dataset")
csv_file = Path(path) / "all-the-news-2-1.csv"

# Resolve base paths
base_dir = Path(__file__).resolve().parent if "__file__" in locals() else Path.cwd()
project_root = base_dir.parent
data_path = project_root / "data"

data_path.mkdir(parents=True, exist_ok=True)  # ensure data directory exists

use_columns = [
    "date",
    "year",
    "month",
    "day",
    "author",
    "title",
    "article",
    "url",
    "section",
    "publication"
]
# --- Load data ---
df = pl.read_csv(
    csv_file, 
    columns=use_columns,
    infer_schema_length=100000,
    ignore_errors=True,
    try_parse_dates=True,
    null_values=["", "NA", "NULL"]
    )
print(df.shape)
print(df.head())


df = df.with_columns(pl.col("date").dt.date().alias("day"))

# --- Partition by day into data folder ---
sum_articles = 0
for day, daily_chunk in df.group_by("day"):
    try:
        # each day gets its own subfolder inside data/
        output_dir = data_path / f"{day[0].isoformat()}"
        output_dir.mkdir(parents=True, exist_ok=True)
        daily_chunk = daily_chunk.with_columns([
            pl.col("article").str.replace_all(r"\s+", " ").alias("article"),
            pl.col("title").str.replace_all(r"\s+", " ").alias("title"),
            pl.col("author").str.replace_all(r"\s+", " ").alias("author"),
        ])

        # write the partition
        daily_chunk.write_csv(output_dir / f"news_articles_{len(daily_chunk)}.csv")
        sum_articles += len(daily_chunk)
    except Exception as e:
        print(f"Error processing day {day[0]}: {e}, chunk size: {len(daily_chunk)}")

print(f"✅ Total articles processed: {sum_articles}")
print(f"✅ Total days partitioned: {len(df.select('day').unique())}")
print(f"✅ Partitioning {sum_articles/df.shape[0]:.2%} complete.")
print(f"🗂️ Partitioned files saved under: {data_path.resolve()}")


(2688879, 10)
shape: (5, 10)
┌───────────────┬──────┬───────┬─────┬───┬──────────────┬──────────────┬────────────┬──────────────┐
│ date          ┆ year ┆ month ┆ day ┆ … ┆ article      ┆ url          ┆ section    ┆ publication  │
│ ---           ┆ ---  ┆ ---   ┆ --- ┆   ┆ ---          ┆ ---          ┆ ---        ┆ ---          │
│ datetime[μs]  ┆ i64  ┆ f64   ┆ i64 ┆   ┆ str          ┆ str          ┆ str        ┆ str          │
╞═══════════════╪══════╪═══════╪═════╪═══╪══════════════╪══════════════╪════════════╪══════════════╡
│ 2016-12-09    ┆ 2016 ┆ 12.0  ┆ 9   ┆ … ┆ This post is ┆ https://www. ┆ null       ┆ Vox          │
│ 18:31:00      ┆      ┆       ┆     ┆   ┆ part of      ┆ vox.com/poly ┆            ┆              │
│               ┆      ┆       ┆     ┆   ┆ Polyarchy…   ┆ archy/…      ┆            ┆              │
│ 2016-10-07    ┆ 2016 ┆ 10.0  ┆ 7   ┆ … ┆ The          ┆ https://www. ┆ null       ┆ Business     │
│ 21:26:46      ┆      ┆       ┆     ┆   ┆ Indianapolis ┆ busi