# Pipeline to manage anomalies processing

Before using any dataset, it is necessary to clean the data to obtain a clear, formatted, and complete dataset. This tedious step is the first step in data analysis.

In this notebook, there are building blocks (functions) that can be used later to perform the identified preprocessing steps.

- Remove missing values
- Remove duplicates
- Remove special characters
- Convert numbers to letters
- Identify the language of the review and translate it if necessary
- Correct spelling errors

In [None]:
import polars as pl
import re
from num2words import num2words
import langid
import concurrent
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm
from deep_translator import GoogleTranslator
from spellchecker import SpellChecker
from symspellpy.symspellpy import SymSpell, Verbosity
import os

In [None]:
# Global variables

NUM_THREAD = 12

## Remove missing values

In [None]:
def clean_missing_values(df: pl.DataFrame, column_name: str) -> pl.DataFrame:
    """
    Remove rows from a DataFrame where the specified column has missing values,
    and return the cleaned DataFrame.
    
    Args:
        df (pl.DataFrame): The DataFrame to clean.
        column_name (str): Column to check for missing values.
        
    Returns:
        pl.DataFrame: Cleaned DataFrame.
    """
    # Drop rows where the specified column is null
    df_clean = df.drop_nulls(subset=[column_name])
    
    return df_clean

## Remove duplicates

In [None]:
def remove_duplicates(df: pl.DataFrame, subset_columns: list) -> pl.DataFrame:
    """
    Remove duplicate rows from a DataFrame based on specified columns,
    and return the cleaned DataFrame.
    
    Args:
        df (pl.DataFrame): The DataFrame to clean.
        subset_columns (list): List of columns to consider for duplicates.
        
    Returns:
        pl.DataFrame: DataFrame with duplicates removed.
    """
    # Drop duplicates based on the subset of columns
    df_clean = df.unique(subset=subset_columns)
    
    return df_clean

## Remove spacial characters

In [None]:
def remove_special_characters(df: pl.DataFrame, column_name: str, keep: str = "") -> pl.DataFrame:
    """
    Remove special characters from a specified text column using regex.

    Args:
        df (pl.DataFrame): Input Polars DataFrame.
        column_name (str): Name of the text column to clean.
        keep (str): Optional string of characters to preserve (e.g., ".," to keep dots and commas).

    Returns:
        pl.DataFrame: New DataFrame with cleaned text in the specified column.
    """
    # Build regex dynamically: allow alphanumeric, space, underscore, and chosen extra characters
    pattern = rf"[^\w\s{re.escape(keep)}]"

    def clean_text(text: str) -> str:
        if not isinstance(text, str) or not text.strip():
            return text  # ignore empty or non-string
        return re.sub(pattern, "", text)

    df_cleaned = df.with_columns(
        pl.col(column_name).map_elements(clean_text).alias(column_name)
    )

    return df_cleaned

## Convert numbers to letters

In [None]:
def numbers_to_words(df: pl.DataFrame, column_name: str) -> pl.DataFrame:
    """
    Convert all numbers in a text column into words using num2words.

    Args:
        df (pl.DataFrame): Input DataFrame.
        column_name (str): Name of the text column to process.
        lang (str): Language code (e.g., 'en' or 'fr').

    Returns:
        pl.DataFrame: New DataFrame with numbers replaced by words.
    """
    def convert_numbers(text: str) -> str:
        if not isinstance(text, str) or not text.strip():
            return text
        # Replace every number with its text version
        return re.sub(r'\b\d+\b', lambda m: num2words(int(m.group())), text)

    df_converted = df.with_columns(
        pl.col(column_name).map_elements(convert_numbers).alias(column_name)
    )

    return df_converted

## Languages and translation

### Languages detection

In [None]:
def detect_language_parallel(df: pl.DataFrame, column_name: str, num_threads: int = 4) -> pl.DataFrame:
    """
    Detect the language of a text column in a Polars DataFrame using langid in parallel.

    Args:
        df (pl.DataFrame): Input DataFrame.
        column_name (str): Name of the text column to process.
        num_threads (int): Number of threads to use for parallel processing (default=4).

    Returns:
        pl.DataFrame: New DataFrame with an added column 'detected_lang' containing language codes.
    """
    def detect_lang(text: str) -> str:
        """Return the language code of a single text using langid."""
        if not isinstance(text, str) or not text.strip():
            return None
        lang, _ = langid.classify(text)
        return lang

    # Convert the Polars column to a Python list
    texts = df[column_name].to_list()

    # Parallel processing with ThreadPoolExecutor
    all_langs = []
    with ThreadPoolExecutor(max_workers=num_threads) as executor:
        for result in tqdm(executor.map(detect_lang, texts), total=len(texts), desc="Language detection"):
            all_langs.append(result)

    # Return new DataFrame with added column
    df_result = df.with_columns(pl.Series("detected_lang", all_langs))
    return df_result

### Translation in english

In [None]:
def translate_non_english_threadsafe(df: pl.DataFrame,
                                     column_name: str,
                                     detected_lang_col: str = "detected_lang",
                                     num_threads: int = 4) -> pl.DataFrame:
    """
    Thread-safe translation: one translator per thread.
    """
    def translate_one(text: str) -> str:
        if not isinstance(text, str) or not text.strip():
            return text
        try:
            translator = GoogleTranslator(source='auto', target='en')  # <— crée un traducteur local
            return translator.translate(text)
        except Exception as e:
            return f"[ERROR: {e}]"

    indices_to_translate = []
    texts_to_translate = []
    for i, (text, lang) in enumerate(zip(df[column_name].to_list(), df[detected_lang_col].to_list())):
        if lang != 'en':
            indices_to_translate.append(i)
            texts_to_translate.append(text)

    translated_texts = df[column_name].to_list()

    with ThreadPoolExecutor(max_workers=num_threads) as executor:
        for idx, translation in zip(
            indices_to_translate,
            tqdm(executor.map(translate_one, texts_to_translate),
                 total=len(texts_to_translate),
                 desc="Translating non-English (thread-safe)")
        ):
            translated_texts[idx] = translation

    return df.with_columns(pl.Series(column_name, translated_texts))

## Correction of spelling error

In [None]:
def correct_spelling_symspell(
    df: pl.DataFrame,
    column_name: str,
    batch_size: int = 10000,
    n_threads: int = 4,
    max_edit_distance: int = 2,
    dictionary_path: str = "frequency_dictionary_en_82_765.txt"
) -> pl.DataFrame:
    """
    Correct spelling using SymSpell (fast) on a Polars DataFrame text column.
    Supports batching and multithreading.

    Args:
        df: Polars DataFrame
        column_name: Name of the text column
        batch_size: Number of rows per batch
        n_threads: Number of parallel threads
        max_edit_distance: Max edit distance for corrections
        dictionary_path: Path to SymSpell frequency dictionary

    Returns:
        Polars DataFrame with corrected text
    """

    # --- Initialize SymSpell ---
    sym_spell = SymSpell(max_dictionary_edit_distance=max_edit_distance, prefix_length=7)
    if not os.path.exists(dictionary_path):
        raise FileNotFoundError(f"Dictionary not found: {dictionary_path}")
    sym_spell.load_dictionary(dictionary_path, term_index=0, count_index=1)

    # --- Function to correct a single review ---
    def fix_text(text: str) -> str:
        if not isinstance(text, str) or not text.strip():
            return text
        corrected_words = []
        for word in text.split():
            suggestions = sym_spell.lookup(word, Verbosity.CLOSEST, max_edit_distance)
            corrected_words.append(suggestions[0].term if suggestions else word)
        return " ".join(corrected_words)

    # --- Split dataframe into batches ---
    batches = [df.slice(i, batch_size) for i in range(0, len(df), batch_size)]
    corrected_reviews = []

    # --- Parallel batch processing ---
    with concurrent.futures.ThreadPoolExecutor(max_workers=n_threads) as executor:
        for batch_result in tqdm(
            executor.map(lambda b: [fix_text(t) for t in b[column_name]], batches),
            total=len(batches),
            desc="Spell-checking with SymSpell"
        ):
            corrected_reviews.extend(batch_result)

    # --- Return DataFrame with corrected column ---
    df_corrected = df.with_columns(
        pl.Series(name=column_name, values=corrected_reviews)
    )

    return df_corrected

## Main

This script automatically runs the entire pipeline on a .csv file and saves the result.

In [None]:
def preprocess_pipeline(input_csv: str, column_name: str, output_csv: str):
    """
    Apply the full preprocessing pipeline to the given CSV file.
    """
    df = pl.read_csv(input_csv)
    df = clean_missing_values(df, column_name)
    df = remove_duplicates(df, column_name)
    df = numbers_to_words(df, column_name)
    df = remove_special_characters(df, column_name)
    df = detect_language_parallel(df, column_name, NUM_THREAD)
    df = translate_non_english_threadsafe(df, column_name, "detected_lang", NUM_THREAD)
    df = correct_spelling_symspell(
        df,
        column_name=column_name,
        batch_size=10000,
        n_threads=NUM_THREAD,
        max_edit_distance=2,
        dictionary_path="../data/original/frequency_dictionary_en_82_765.txt"
    )
    df.write_csv(output_csv)

In [None]:
if __name__ == "__main__":
     preprocess_pipeline("../data/original/Booking/val.csv",
                         "review_positive",
                         "../data/processed/val_cleaned.csv")