# 00 — Read CSV files (of CIC-IDS2017)

This notebook reads the original CIC-IDS2017 CSV files in a **memory‑friendly, chunked** manner, applies **lightweight cleaning**, **normalizes odd dash characters in labels**, and writes **one single combined Parquet** file.

### What this notebook includes
- **Column name standardization** (spaces/specials → `_`, trimmed, de‑duplicated).
- Replacement of problematic values (`±Infinity` → `NaN`; also recognizes literal `"Infinity"` tokens during CSV parsing).
- String columns stripped of leading/trailing whitespace.
- **Robust normalization of “dash-like” characters in `Label`**, e.g.  
  `Web Attack � Brute Force`, `Web Attack – XSS`, `Web Attack — Sql Injection`  
  ⟶ **`Web Attack - Brute Force`**, **`Web Attack - XSS`**, **`Web Attack - Sql Injection`**.
- **Streamed** combination into `data/processed/cicids2017_full_clean.parquet`.

## Configuration

In [None]:
# ============================================================================
# Configuration
# ============================================================================
from pathlib import Path

# Project root (assumes this notebook is in a 'notebooks/' dir)
PROJECT_ROOT = Path("..").resolve()

# Input: original CIC-IDS2017 CSVs (non-recursive)
DATA_DIR = PROJECT_ROOT / "data" / "raw" / "MachineLearningCSV"

# Output: combined cleaned dataset
OUTPUT_DIR = PROJECT_ROOT / "data" / "processed"
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

# Where to write the final combined, cleaned Parquet file
COMBINED_PARQUET_PATH = OUTPUT_DIR / "cicids2017_full_clean.parquet"

# Parquet compression
PARQUET_COMPRESSION = "snappy"

# Chunk size for streaming CSV reads
CHUNKSIZE = 100_000

print("PROJECT_ROOT:", PROJECT_ROOT)
print("DATA_DIR:", DATA_DIR)
print("OUTPUT_DIR:", OUTPUT_DIR)
print("COMBINED_PARQUET_PATH:", COMBINED_PARQUET_PATH)

PROJECT_ROOT: C:\Users\cedric\code\VS\Studium\Hausarbeit\Code
DATA_DIR: C:\Users\cedric\code\VS\Studium\Hausarbeit\Code\data\raw\MachineLearningCSV
OUTPUT_DIR: C:\Users\cedric\code\VS\Studium\Hausarbeit\Code\data\processed
COMBINED_PARQUET_PATH: C:\Users\cedric\code\VS\Studium\Hausarbeit\Code\data\processed\cicids2017_full_clean.parquet


## Utilities

The utilities below do four key things:

1. **List** raw CSV files to process.  
2. **Standardize** column names to avoid downstream issues (e.g., spaces, `/`, `-`, `.`) and ensure uniqueness.  
3. **Clean** each chunk:
   - strip leading/trailing whitespace from strings,
   - replace `±Infinity` with `NaN`,
   - **normalize dash-like characters in `Label`** into a plain ASCII hyphen with spaces (`" - "`).  
4. **Stream** the cleaned chunks into **one** Parquet file to keep memory usage bounded.

In [2]:
from typing import Iterable, List, Dict
import pandas as pd
import numpy as np
import re

def list_raw_csvs(data_dir: Path, pattern: str = "*.csv") -> List[Path]:
    """Return a sorted list of CSV files in the given directory (non-recursive)."""
    files = sorted(data_dir.glob(pattern))
    if not files:
        print(f"[WARN] No CSV files found in {data_dir}")
    return files

def standardize_column_names(cols: Iterable[str]) -> List[str]:
    """
    Normalize column names:
      - strip leading/trailing whitespace,
      - collapse inner spaces,
      - replace common specials with '_',
      - normalize 'Label' capitalization,
      - ensure uniqueness after normalization by adding suffixes '__1', '__2', ...
    """
    normalized: List[str] = []
    for c in cols:
        c0 = str(c).strip()
        c0 = " ".join(c0.split())  # collapse multiple spaces
        for ch in [' ', '/', '(', ')', '-', ':', ',', '\\', '.']:
            c0 = c0.replace(ch, '_')
        while "__" in c0:
            c0 = c0.replace("__", "_")
        if c0.lower() == "label":
            c0 = "Label"
        normalized.append(c0)

    # Deduplicate after normalization
    seen: Dict[str, int] = {}
    result: List[str] = []
    for name in normalized:
        if name not in seen:
            seen[name] = 0
            result.append(name)
        else:
            seen[name] += 1
            result.append(f"{name}__{seen[name]}")
    return result

# --- Dash / replacement-character normalization for labels --------------------
# Treat hyphen-minus and many dash-like Unicode chars + the replacement char U+FFFD as one class
_DASH_CLASS = r"[-\u2010-\u2015\u2212\u2043\uFE58\uFE63\uFF0D\uFFFD]"

def normalize_label_dashes(series: pd.Series) -> pd.Series:
    """Unify various 'dash-like' characters to a plain ' - ' pattern and tidy spaces."""
    return (
        series.astype("string")
        .str.normalize("NFKC")                                  # Unicode normalize
        .str.replace(fr"\s*{_DASH_CLASS}\s*", " - ", regex=True)  # unify to ' - '
        .str.replace(r"\s+", " ", regex=True)                   # collapse spaces
        .str.strip()
    )

def clean_chunk(df: pd.DataFrame) -> pd.DataFrame:
    """
    Apply minimal cleaning on a chunk:
      - Rename columns consistently,
      - Strip strings in object columns,
      - Replace ±Infinity with NaN,
      - Normalize dash characters inside 'Label' so names are consistent.
    Note: we keep 'Label' as string to avoid category schema friction across chunks.
    """
    df.columns = standardize_column_names(df.columns)

    # Strip textual columns
    for c in df.select_dtypes(include=['object']).columns:
        df[c] = df[c].astype('string').str.strip()

    # Replace infinities
    df = df.replace([np.inf, -np.inf], np.nan)

    # Normalize 'Label' dashes if present
    if 'Label' in df.columns:
        df['Label'] = normalize_label_dashes(df['Label'])

    return df

def iter_csv_in_chunks(path: Path, chunksize: int = 100_000):
    """
    Yield CLEANED chunks from a CSV (streaming for low memory usage).
    Also treats literal 'Infinity' tokens as NaN.
    """
    for chunk in pd.read_csv(
        path,
        low_memory=False,
        chunksize=chunksize,
        na_values=['Infinity', 'inf', 'NaN', 'nan']
    ):
        yield clean_chunk(chunk)

## Writer: combine into one cleaned Parquet

This writes the merged, cleaned dataset in a **streaming** fashion so you can process large files on modest hardware. We rely on `pyarrow`'s `ParquetWriter` to append **row groups** as we go.


In [3]:
def combine_to_parquet_cleaned(
    files: List[Path],
    out_path: Path,
    compression: str = "snappy",
    chunksize: int = 100_000,
) -> Path:
    """Combine all CSVs into ONE cleaned Parquet (streamed)."""
    import pyarrow as pa
    import pyarrow.parquet as pq

    writer = None
    total_rows = 0
    for f in files:
        print(f"[INFO] Processing: {f.name}")
        for chunk in iter_csv_in_chunks(f, chunksize=chunksize):
            total_rows += len(chunk)
            table = pa.Table.from_pandas(chunk, preserve_index=False)
            if writer is None:
                writer = pq.ParquetWriter(out_path, table.schema, compression=compression)
            writer.write_table(table)
    if writer is not None:
        writer.close()
    print(f"[DONE] Wrote {total_rows} cleaned rows to -> {out_path}")
    return out_path

## Run

- **Lists** found CSVs.
- **Streams** them into the combined cleaned Parquet.
- Prints a quick metadata check (row count / row groups) without loading everything into RAM.


In [4]:
files = list_raw_csvs(DATA_DIR, pattern="*.csv")
print("Found CSV files:", len(files))
for p in files:
    print(" -", p.name)

if not files:
    raise SystemExit("No CSVs found. Please check DATA_DIR.")

# Build the combined Parquet (cleaned)
combine_to_parquet_cleaned(
    files=files,
    out_path=COMBINED_PARQUET_PATH,
    compression=PARQUET_COMPRESSION,
    chunksize=CHUNKSIZE,
)

# Lightweight validation: read row count from Parquet metadata (no full load)
try:
    import pyarrow.parquet as pq
    pf = pq.ParquetFile(COMBINED_PARQUET_PATH)
    print(f"Parquet metadata: {pf.metadata.num_rows} rows, {pf.metadata.num_row_groups} row groups.")
except Exception as e:
    print("[WARN] Could not read Parquet metadata:", e)

Found CSV files: 8
 - Friday-WorkingHours-Afternoon-DDos.pcap_ISCX.csv
 - Friday-WorkingHours-Afternoon-PortScan.pcap_ISCX.csv
 - Friday-WorkingHours-Morning.pcap_ISCX.csv
 - Monday-WorkingHours.pcap_ISCX.csv
 - Thursday-WorkingHours-Afternoon-Infilteration.pcap_ISCX.csv
 - Thursday-WorkingHours-Morning-WebAttacks.pcap_ISCX.csv
 - Tuesday-WorkingHours.pcap_ISCX.csv
 - Wednesday-workingHours.pcap_ISCX.csv
[INFO] Processing: Friday-WorkingHours-Afternoon-DDos.pcap_ISCX.csv
[INFO] Processing: Friday-WorkingHours-Afternoon-PortScan.pcap_ISCX.csv
[INFO] Processing: Friday-WorkingHours-Morning.pcap_ISCX.csv
[INFO] Processing: Monday-WorkingHours.pcap_ISCX.csv
[INFO] Processing: Thursday-WorkingHours-Afternoon-Infilteration.pcap_ISCX.csv
[INFO] Processing: Thursday-WorkingHours-Morning-WebAttacks.pcap_ISCX.csv
[INFO] Processing: Tuesday-WorkingHours.pcap_ISCX.csv
[INFO] Processing: Wednesday-workingHours.pcap_ISCX.csv
[DONE] Wrote 2830743 cleaned rows to -> C:\Users\cedric\code\VS\Studium\Hau