# Pré-traitement train/test (VS Code)

Objectif : lire `src/data/train.csv` et `src/data/test.csv`, nettoyer la colonne `text`, puis écrire :
- `src/data/train_clean.csv`
- `src/data/test_clean.csv`

Notes :
- Ce notebook évite `google.colab.files.upload()` (non applicable sur VS Code).
- L’écriture en **un seul fichier CSV** est gérée via `coalesce(1)` + renommage du `part-*.csv`.


In [None]:
!pip install -q pyspark
# Si tu es dans un venv/conda avec pyspark déjà installé, tu peux commenter la ligne ci-dessus.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType

from pyspark.ml import Pipeline
from pyspark.ml.feature import (
    Tokenizer,
    StopWordsRemover,
    NGram,
    HashingTF,
    VectorAssembler,
)

In [None]:
spark = (
    SparkSession.builder
    .appName("TextProcessingPipeline")
    .master("local[*]")  # important pour Colab
    .config("spark.ui.showConsoleProgress", "false")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("ERROR")
print("✓ Spark démarré")

In [None]:
from pathlib import Path

ROOT = Path().resolve()
DATA_DIR = ROOT / "src" / "data"

train_path = DATA_DIR / "train.csv"
test_path  = DATA_DIR / "test.csv"

assert train_path.exists(), f"Fichier introuvable: {train_path}"
assert test_path.exists(), f"Fichier introuvable: {test_path}"

train_df = spark.read.csv(str(train_path), header=True, inferSchema=True)
test_df  = spark.read.csv(str(test_path), header=True, inferSchema=True)

print("train cols:", train_df.columns)
print("test  cols:", test_df.columns)
print("train rows:", train_df.count())
print("test  rows:", test_df.count())

In [None]:
from pyspark.sql import functions as F

def clean_text(df, text_col="text"):
    # Nettoyage minimal (adaptable)
    return (
        df
        .withColumn(text_col, F.lower(F.col(text_col)))
        .withColumn(text_col, F.regexp_replace(F.col(text_col), r"http\S+|www\.\S+", " "))      # URLs
        .withColumn(text_col, F.regexp_replace(F.col(text_col), r"[^a-z0-9\s]", " "))            # ponctuation/symboles
        .withColumn(text_col, F.regexp_replace(F.col(text_col), r"\s+", " "))                    # espaces multiples
        .withColumn(text_col, F.trim(F.col(text_col)))
    )

# Vérifie la présence de la colonne texte
for name, df_ in [("train", train_df), ("test", test_df)]:
    if "text" not in df_.columns:
        raise ValueError(f"Colonne 'text' absente dans {name}. Colonnes: {df_.columns}")

train_clean_df = clean_text(train_df, "text")
test_clean_df  = clean_text(test_df, "text")

train_clean_df.select("text").show(3, truncate=80)

In [None]:
import shutil
import glob

def write_single_csv(df, out_csv_path: Path):
    out_csv_path = Path(out_csv_path)
    tmp_dir = out_csv_path.with_suffix("")  # ex: train_clean (dossier)
    if tmp_dir.exists():
        shutil.rmtree(tmp_dir)
    if out_csv_path.exists():
        out_csv_path.unlink()

    (
        df.coalesce(1)
          .write
          .mode("overwrite")
          .option("header", True)
          .csv(str(tmp_dir))
    )

    part_files = glob.glob(str(tmp_dir / "part-*.csv"))
    if not part_files:
        raise RuntimeError(f"Aucun part-*.csv trouvé dans {tmp_dir}")

    shutil.move(part_files[0], out_csv_path)
    # Nettoie le dossier temporaire
    shutil.rmtree(tmp_dir)

    return out_csv_path

out_train = write_single_csv(train_clean_df, DATA_DIR / "train_clean.csv")
out_test  = write_single_csv(test_clean_df,  DATA_DIR / "test_clean.csv")

print("✓ écrit:", out_train)
print("✓ écrit:", out_test)

## Optionnel : featurisation (même pipeline appliqué à train + test)
Si tu n’en as pas besoin pour ton rendu, tu peux ignorer cette section.

In [None]:
tokenizer = Tokenizer(inputCol="text", outputCol="tokens")

stopwords = StopWordsRemover(
    inputCol="tokens",
    outputCol="filtered_tokens"
)

ngram = NGram(
    n=2,
    inputCol="filtered_tokens",
    outputCol="ngrams"
)

hashing_tf = HashingTF(
    inputCol="ngrams",
    outputCol="features",
    numFeatures=2**18
)

pipeline = Pipeline(stages=[
    tokenizer,
    stopwords,
    ngram,
    hashing_tf
])


In [None]:
print("Entraînement du pipeline sur train...")
model = pipeline.fit(train_clean_df)

print("Transformation train/test...")
train_feat = model.transform(train_clean_df)
test_feat  = model.transform(test_clean_df)

train_feat.select("features").show(2, truncate=False)

In [None]:
spark.stop()
print("✓ Spark arrêté")