# Datasets Pre-Processing
### Importing Libraries

In [34]:
""" Preprocessing of the Amazon Reviews dataset """
import os
import concurrent.futures
import pandas as pd 
import polars as pl # pylint: disable=E0401
from tqdm import tqdm
from datasets import load_dataset # pylint: disable=E0401

# Downloading Datasets and saving them locally
This section is from older versions of the code. Once I had the originals I started using the next version.
In those older versions there is code for 

In [35]:
# Control variables for code execution
DOWNLOAD_ORIGINAL_DATASETS = False
SAVE_LOCAL_DATASETS = False

if DOWNLOAD_ORIGINAL_DATASETS:
    # Get all available configurations (sub-categories)
    categories = [
        "raw_review_All_Beauty",
        "raw_review_Electronics",
        "raw_review_Office_Products",
        "raw_meta_All_Beauty",
        "raw_meta_Electronics",
        "raw_meta_Office_Products",
    ]

    # Initialize an empty list to hold datasets
    datasets_list = []

    # Loop over each configuration and download the dataset
    for category in categories:
        dataset = load_dataset("McAuley-Lab/Amazon-Reviews-2023", category)
        datasets_list.append(dataset)

if SAVE_LOCAL_DATASETS:
    # Save each dataset locally
    for ds, category in zip(datasets_list, categories):
        ds.save_to_disk(f"./{category}_dataset")

### Loading all individual Dataframes from local CSVs
Executed in parallel to reduce load time

In [36]:
# Parallel loading of pre-saved dataframes
file_paths = [
    "./dataframes/small_df.csv",
    "./dataframes/beauty_df.csv",
    "./dataframes/electronics_df.csv",
    "./dataframes/office_df.csv",
]


def load_csv(path: str) -> pl.DataFrame:
    """
    Loads a csv file into a polars dataframe

    Args:
        path (str): Path to the csv file

    Returns:
        pl.DataFrame
    """
    return pl.read_csv(path)


with concurrent.futures.ThreadPoolExecutor() as executor:
    dataframes = list(executor.map(load_csv, file_paths))

# Pre-Processing
### Defining Functions for Removing short Reviews, Cleaning nulls and Duplicates ans Slicing dataframes

In [37]:
def rmv_short_long(
    dataframe: pl.DataFrame, min_word_count: int = 25, max_word_count: int = 512
) -> pl.DataFrame:
    """
    Remove rows from a polars dataframe where the text of the 'text' column
    is shorter than 'min_word_count' or longer than 'max_word_count'

    Args:
        dataframe (pl.DataFrame)
        min_word_count (int)
        max_word_count (int)

    Returns:
        pl.DataFrame
    """
    # Filter the dataframe
    word_count_filtered_df = dataframe.filter(
        (pl.col("text").str.split(" ").list.len() >= min_word_count)
        & (pl.col("text").str.split(" ").list.len() <= max_word_count)
    )
    
    # Calculate number and percentage of rows removed
    rows_removed = dataframe.height - word_count_filtered_df.height
    percentage_removed = (rows_removed / dataframe.height) * 100

    # Print the results
    print(f"Rows removed: {rows_removed} ({percentage_removed:.2f}% of the total)")

    return word_count_filtered_df

# Remove Duplicates and Nulls.
# Lazy is used for more efficient execution of the multiple operations in the dataframes
def clean_data(dataframe: pl.DataFrame) -> pl.DataFrame:
    """
    Removes duplicates, None values, and rows where 'rating' is 0 from a Polars dataframe.

    Args:
        dataframe (pl.DataFrame)

    Returns:
        pl.DataFrame
    """
    df_cleaned = (
        dataframe.lazy()
        .unique(subset=["text"])  # Remove duplicates based on 'text' column
        .drop_nulls()  # Drop rows with None values
        .filter(pl.col("rating") != 0)  # Filter out rows where 'rating' is 0
    )
    return df_cleaned.collect()

# Split into n parts using slicing
# Used in previous version sfor processing large datasets
def slice_pl_df(dataframe: pl.DataFrame, slices: int) -> list[pl.DataFrame]:
    """
    Slices a polars dataframes into 'slices' parts

    Args:
        dataframe (pl.DataFrame)
        slices (int): Number of slices or parts
    Returns:
        list[pl.DataFrame]:
    """
    slice_size = dataframe.shape[0] // slices
    dfs = []

    for df_slice in tqdm(range(slices)):
        start_idx = df_slice * slice_size
        if df_slice == slices - 1:  # Ensure the last slice takes all remaining rows
            dfs.append(dataframe[start_idx:])
        else:
            end_idx = (df_slice + 1) * slice_size
            dfs.append(dataframe[start_idx:end_idx])
    return dfs

# Create the final dataset
The individual datasets are processed and then combined into one by sampling with an equal distribution of review ratings and individual dataframes

In [38]:
# Constants
FINAL_DF_ROWS = 250_000
LARGE_SAMPLE_SIZE = 1_000_000

selected_columns = [
    "title",
    "text",
    "rating",
    "id",
    "parent_asin",
    "name",
    "categories",
]
groups = ["Negative", "Neutral", "Positive"]
rating_to_group = {
    1.0: "Negative",
    2.0: "Negative",
    3.0: "Neutral",
    4.0: "Positive",
    5.0: "Positive",
}

# Drop unwanted columns and null values
dataframes = [df[selected_columns] for df in dataframes]

small_dataframes = []
for df in tqdm(dataframes, desc="Sampling and grouping dataframes"):
    # Sample the dataframe if its size exceeds LARGE_SAMPLE_SIZE
    if df.height > LARGE_SAMPLE_SIZE:
        df = df.sample(LARGE_SAMPLE_SIZE)
    
    # Add a new 'group' column by mapping the 'rating' column
    df = df.with_columns(
        group=pl.col("rating").replace_strict(rating_to_group)
    )
    
    small_dataframes.append(df)

# Slice large dataframes into smaller chunks to improve memory efficiency
# Not used in this version because we sample instead.
""" for df in tqdm(dataframes, desc="Slicing dataframes"):
    if df.height > LARGE_SAMPLE_SIZE:
        df = slice_pl_df(df, 10) """

# Clean and preprocess the dataframes.
for df in tqdm(small_dataframes, desc="Cleaning dataframes"):
    if isinstance(df, list):
        cleaned_dfs = []
        for sub_df in df:
            sub_df = clean_data(sub_df)
            sub_df = rmv_short_long(sub_df)
            cleaned_dfs.append(sub_df)
        df = pl.concat(cleaned_dfs)
    else:
        df = clean_data(df)
        df = rmv_short_long(df)

# Determine total required rows per group
all_group_counts = (
    pl.concat([df.select("group") for df in small_dataframes])
    .group_by("group")
    .len()
)

rows_per_group = FINAL_DF_ROWS // all_group_counts.height

# Initialize a list to collect samples
final_samples = []

# Allocate and sample data for each group
for group in tqdm(groups, desc="Sampling groups"):
    # Calculate the total available rows for the group across all datasets
    total_available = sum(
        df.filter(pl.col("group") == group).height for df in small_dataframes
    )
    required_rows_adjusted = min(rows_per_group, total_available)

    # Ideally, each dataset contributes equally
    per_dataset_required = required_rows_adjusted // len(small_dataframes)

    # Sample data from each dataset for the group
    for df in small_dataframes:
        df_group = df.filter(pl.col("group") == group)
        available_rows = df_group.height
        sample_size = min(per_dataset_required, available_rows)
        if sample_size > 0:
            sample_df = df_group.sample(n=sample_size)
            final_samples.append(sample_df)

# Concatenate all sampled data into the final DataFrame
final_df = pl.concat(final_samples)

# Verify the final DataFrame
print(f"\nFinal DataFrame shape: {final_df.shape}")
print("\nGroup distribution:")
print(final_df.group_by("group").len())

final_df = final_df.drop("group")

Sampling and grouping dataframes: 100%|██████████| 4/4 [00:00<00:00,  9.23it/s]
Cleaning dataframes:   0%|          | 0/4 [00:00<?, ?it/s]

Rows removed: 5 (0.07% of the total)


Cleaning dataframes:  50%|█████     | 2/4 [00:00<00:00,  2.59it/s]

Rows removed: 505 (0.24% of the total)


Cleaning dataframes:  75%|███████▌  | 3/4 [00:19<00:08,  8.07s/it]

Rows removed: 55932 (1.86% of the total)


Cleaning dataframes: 100%|██████████| 4/4 [00:29<00:00,  7.33s/it]


Rows removed: 11314 (0.38% of the total)


Sampling groups: 100%|██████████| 3/3 [00:00<00:00,  9.37it/s]


Final DataFrame shape: (187494, 8)

Group distribution:
shape: (3, 2)
┌──────────┬───────┐
│ group    ┆ len   │
│ ---      ┆ ---   │
│ str      ┆ u32   │
╞══════════╪═══════╡
│ Neutral  ┆ 62498 │
│ Negative ┆ 62498 │
│ Positive ┆ 62498 │
└──────────┴───────┘





# Save as a CSV file

In [8]:
# Save to CSV atomically
final_df.write_csv("final_dataset_200k.csv.tmp")
os.replace("final_dataset_200k.csv.tmp", "final_dataset_200k.csv")