In [15]:
import polars as pl
import webcolors
from webcolors import hex_to_name
from PIL import ImageColor
import numpy as np
import pyarrow.csv as pv
import pyarrow.parquet as pq

In [16]:
# Precompute CSS3 color information for vectorized closest_color function
CSS3_COLOR_RGB = {name: webcolors.name_to_rgb(name) for name in webcolors.names("css3")}
CSS3_COLOR_NAMES = np.array(list(CSS3_COLOR_RGB.keys()))
CSS3_COLOR_VALUES = np.array(list(CSS3_COLOR_RGB.values()))  # Shape: (n_colors, 3)

# Vectorized function to find the closest CSS3 colors for a list of RGB colors
def closest_color_vectorized(requested_colors):
    """
    Find the closest CSS3 colors for a list of RGB colors using vectorized computation.
    """
    requested_colors_np = np.array(requested_colors)  # Convert to NumPy array (n_samples, 3)
    distances = np.sum((CSS3_COLOR_VALUES[None, :, :] - requested_colors_np[:, None, :]) ** 2, axis=2)
    closest_indices = np.argmin(distances, axis=1)  # Find the index of the closest color for each input
    return CSS3_COLOR_NAMES[closest_indices]  # Return the closest color names as a vector

# Optimized function to get the English name for a pixel color (vectorized)
def get_color_name_vectorized(hex_values):
    """
    Convert a list of hex colors to their English names or the closest color names.
    """
    # Convert hex colors to RGB
    rgb_colors = [ImageColor.getcolor(hex_value, "RGB") for hex_value in hex_values]

    # Use the vectorized closest_color function to get the names
    return closest_color_vectorized(rgb_colors)

# User ID remapping function with global dictionary for consistent remapping
user_id_dict = {}
user_id_counter = 0

def map_user_id_vectorized(user_ids):
    """
    Map a list of user IDs to incremental integer IDs.
    """
    global user_id_counter
    mapped_ids = []
    for user_id in user_ids:
        if user_id not in user_id_dict:
            user_id_counter += 1
            user_id_dict[user_id] = user_id_counter
        mapped_ids.append(user_id_dict[user_id])
    return np.array(mapped_ids)  # Return as a NumPy array for compatibility with vectorized processing


In [18]:
# Input and output file paths
csv_file = "./../2022_place_canvas_history.csv"
parquet_file = "./../2022_rplace.parquet"

# Configuration constants
DATESTRING_FORMAT = "%Y-%m-%d %H:%M:%S"
BATCH_SIZE = 100_000_000  # Number of rows per batch

# Calculate total rows in the CSV file
total_rows = sum(1 for _ in open(csv_file)) - 1  # Subtract 1 for the header row
processed_rows = 0  # Initialize a counter for processed rows

# Open CSV file in batches
csv_reader = pv.open_csv(csv_file, read_options=pv.ReadOptions(block_size=BATCH_SIZE))

# Initialize Parquet writer
parquet_writer = None

try:
    for record_batch in csv_reader:
        batch_rows = record_batch.num_rows
        processed_rows += batch_rows

        # Convert Arrow record batch to Polars DataFrame
        df = pl.from_arrow(record_batch)

        # Process timestamp column
        df = df.with_columns(
            pl.col("timestamp")
            .str.replace(r" UTC$", "")
            .str.strptime(
                pl.Datetime, format="%Y-%m-%d %H:%M:%S%.f", strict=False
            )
            .alias("timestamp")
        )

       # Convert pixel_color column to English names (vectorized)
        pixel_colors = df["pixel_color"].to_list()
        color_names = get_color_name_vectorized(pixel_colors)
        df = df.with_columns(pl.Series(name="pixel_color_english", values=color_names))

        # Map user_id column to incremental IDs (vectorized)
        user_ids = df["user_id"].to_list()
        mapped_ids = map_user_id_vectorized(user_ids)
        df = df.with_columns(pl.Series(name="user_id_int", values=mapped_ids))
        
        
        # Drop the original pixel_color and user_id columns
        df = df.drop(["pixel_color", "user_id"])

        # Filter and process coordinate column
        df = (
            df.filter(
                pl.col("coordinate").str.count_matches(",") == 1
            )
            .with_columns(
                pl.col("coordinate")
                .str.split_exact(",", 1)
                .struct.field("field_0")
                .cast(pl.Int64)
                .alias("x"),
                pl.col("coordinate")
                .str.split_exact(",", 1)
                .struct.field("field_1")
                .cast(pl.Int64)
                .alias("y"),
            )
            .drop("coordinate")
        )

        # Convert Polars DataFrame to Arrow Table
        table = df.to_arrow()

        # Initialize ParquetWriter with schema if not already set
        if parquet_writer is None:
            parquet_writer = pq.ParquetWriter(
                parquet_file, schema=table.schema, compression="zstd"
            )

        # Write the table to the Parquet file
        parquet_writer.write_table(table)

        # Print progress as percentage
        progress = (processed_rows / total_rows) * 100
        print(f"Progress: {progress:.2f}%")

finally:
    # Close the Parquet writer
    if parquet_writer:
        parquet_writer.close()

print(f"Successfully converted {csv_file} to {parquet_file}")


Progress: 0.46%
Progress: 0.92%
Progress: 1.38%
Progress: 1.84%
Progress: 2.29%
Progress: 2.75%
Progress: 3.21%
Progress: 3.67%
Progress: 4.13%
Progress: 4.59%
Progress: 5.05%
Progress: 5.51%
Progress: 5.97%
Progress: 6.43%
Progress: 6.89%
Progress: 7.36%
Progress: 7.82%
Progress: 8.28%
Progress: 8.74%
Progress: 9.21%
Progress: 9.66%
Progress: 10.12%
Progress: 10.59%
Progress: 11.05%
Progress: 11.51%
Progress: 11.98%
Progress: 12.44%
Progress: 12.90%
Progress: 13.37%
Progress: 13.83%
Progress: 14.29%
Progress: 14.76%
Progress: 15.22%
Progress: 15.68%
Progress: 16.14%
Progress: 16.61%
Progress: 17.07%
Progress: 17.53%
Progress: 18.00%
Progress: 18.46%
Progress: 18.92%
Progress: 19.39%
Progress: 19.85%
Progress: 20.31%
Progress: 20.78%
Progress: 21.24%
Progress: 21.70%
Progress: 22.17%
Progress: 22.63%
Progress: 23.09%
Progress: 23.55%
Progress: 24.01%
Progress: 24.48%
Progress: 24.94%
Progress: 25.40%
Progress: 25.86%
Progress: 26.32%
Progress: 26.78%
Progress: 27.25%
Progress: 27.71%
P

In [21]:
# Path to the Parquet file
parquet_file = "./../2022_rplace.parquet"

# Read the first few rows of the Parquet file
df = pl.read_parquet(parquet_file)

# Print the resulting DataFrame
print(df)

# Get the number of unique user IDs
unique_user_ids = df["user_id_int"].n_unique()

# Print the number of unique user IDs
print(f"Number of unique user IDs: {unique_user_ids}")

shape: (160_353_085, 5)
┌─────────────────────────┬─────────────────────┬─────────────┬──────┬──────┐
│ timestamp               ┆ pixel_color_english ┆ user_id_int ┆ x    ┆ y    │
│ ---                     ┆ ---                 ┆ ---         ┆ ---  ┆ ---  │
│ datetime[μs]            ┆ str                 ┆ i64         ┆ i64  ┆ i64  │
╞═════════════════════════╪═════════════════════╪═════════════╪══════╪══════╡
│ 2022-04-04 00:53:51.577 ┆ darkturquoise       ┆ 1           ┆ 826  ┆ 1048 │
│ 2022-04-04 00:53:53.758 ┆ lightskyblue        ┆ 2           ┆ 583  ┆ 1031 │
│ 2022-04-04 00:53:54.685 ┆ mediumslateblue     ┆ 3           ┆ 1873 ┆ 558  │
│ 2022-04-04 00:54:57.541 ┆ darkcyan            ┆ 4           ┆ 1627 ┆ 255  │
│ 2022-04-04 00:55:16.307 ┆ lightskyblue        ┆ 5           ┆ 49   ┆ 1478 │
│ …                       ┆ …                   ┆ …           ┆ …    ┆ …    │
│ 2022-04-05 00:14:00.066 ┆ white               ┆ 2119678     ┆ 408  ┆ 493  │
│ 2022-04-05 00:14:00.145 ┆ white       