In [3]:
import polars as pl
import os
from datetime import datetime, timezone
import re
from helper_functions import clean_parquet_files

def preprocess_review(review):
    # Remove non-European characters
    review = re.sub(r'[^\x00-\x7F]', '', review)
    # Remove newline and carriage return characters
    review = review.replace('\n', '').replace('\r', '')
    return review


def preprocess_gamereviews(gamereviews):
    gamereviews_dict = gamereviews.to_dict(as_series=False)

    # Preprocess the review
    gamereviews_dict["review"] = preprocess_review(gamereviews_dict["review"])
    
    # Skip rows with reviews less than 20 characters
    if len(gamereviews_dict["review"]) < 20:
        return None
    
    # Format timestamps
    gamereviews_dict["timestamp_created"] = datetime.fromtimestamp(gamereviews_dict["timestamp_created"], tz=timezone.utc)
    gamereviews_dict["timestamp_updated"] = datetime.fromtimestamp(gamereviews_dict["timestamp_updated"], tz=timezone.utc)
    gamereviews_dict["author"]["last_played"] = datetime.fromtimestamp(gamereviews_dict["author"]["last_played"], tz=timezone.utc)
    
    # Unpack nested author fields
    gamereviews_dict["user_steamid"] = gamereviews_dict["author"]["steamid"]
    gamereviews_dict["user_num_games_owned"] = gamereviews_dict["author"]["num_games_owned"]
    gamereviews_dict["user_num_reviews"] = gamereviews_dict["author"]["num_reviews"]
    gamereviews_dict["user_playtime_forever"] = gamereviews_dict["author"]["playtime_forever"]
    gamereviews_dict["user_playtime_at_review"] = gamereviews_dict["author"]["playtime_at_review"]
    gamereviews_dict["user_last_played"] = gamereviews_dict["author"]["last_played"]
    
    # Remove unnecessary fields
    gamereviews_dict.pop("author")
    gamereviews_dict.pop("language", None)
    gamereviews_dict.pop("hidden_in_steam_china", None)
    gamereviews_dict.pop("steam_china_location", None)
    
    # Format boolean and float fields
    gamereviews_dict["voted_up"] = bool(gamereviews_dict["voted_up"])
    gamereviews_dict["steam_purchase"] = bool(gamereviews_dict["steam_purchase"])
    gamereviews_dict["received_for_free"] = bool(gamereviews_dict["received_for_free"])
    gamereviews_dict["written_during_early_access"] = bool(gamereviews_dict["written_during_early_access"])
    gamereviews_dict["weighted_vote_score"] = float(gamereviews_dict["weighted_vote_score"])
    
    # Convert back to a DataFrame row
    return gamereviews_dict

def preprocess_parquet_file(filepath, output_filepath, appid):
    # Read the parquet file
    df = pl.read_parquet(filepath)
    
    # Apply preprocessing to each row
    preprocessed_data = [preprocess_gamereviews(row) for row in df.iter_rows(named=True) if row is not None]
    
    # Filter out None values (reviews less than 20 characters)
    preprocessed_data = [row for row in preprocessed_data if row is not None]
    
    # Create a new DataFrame from preprocessed data
    if preprocessed_data:
        preprocessed_df = pl.DataFrame(preprocessed_data)
        
        # Add the appid column
        preprocessed_df = preprocessed_df.with_column(pl.lit(appid).alias("appid"))
        
        # Check for duplicates
        preprocessed_df = preprocessed_df.unique()
        
        # Write the processed file
        preprocessed_df.write_parquet(output_filepath)

input_folder = 'data/parquets'
output_folder = 'data/parquets_preprocessed'

clean_parquet_files(input_folder)

for file_name in os.listdir(input_folder):
    match = re.match(r"(\d+)_reviews_\d+\.parquet", file_name)
    if match:
        appid = match.group(1)
        input_filepath = os.path.join(input_folder, file_name)
        output_filepath = os.path.join(output_folder, file_name.replace('.parquet', '_preprocessed.parquet'))
        
        # Check if the processed file already exists
        if not os.path.exists(output_filepath):
            preprocess_parquet_file(input_filepath, output_filepath, appid)
