<a href="https://colab.research.google.com/github/haduong777/Netflix_Rec/blob/main/Preprocessing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [18]:
import pandas as pd
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
import os
import boto3
from google.colab import drive
from tqdm.notebook import tqdm

In [60]:
# 3 helper func parse_probe, process_parquet and process_batch
def parse_probe(path):
  """
  Parse probe.txt

  Returns (movie_id, user_id) pairs
  """
  probe_dict = {}
  curr_movie = None

  with open(path, 'r') as f:
    for line in f:
      line = line.strip()
      if line.endswith(':'):
        curr_movie = int(line[:-1])
      elif line and curr_movie is not None:
        probe_dict[(curr_movie, int(line))] = None

  return probe_dict

def process_batch(batch, probe_pairs, output, append=False):
  """
  Process batch of data. Updating probe_pairs

  Args
    batch: df batch to process
    probe_pairs: (movie_id, user_id) pairs
    output: path to save processed data
    append: whether to append to existing file

  Returns
    tuple: (train_count, probe_count)
  """

  # probe key now matches up to a column in the df batch: 'temp_key'
  batch['temp_key'] = batch['movie_id'].astype(str) + '_' + batch['user_id'].astype(str)

  probe_keys = {f"{movie_id}_{user_id}" for movie_id, user_id in probe_pairs.keys()}

  # mask to separate probe and train data
  mask = batch['temp_key'].isin(probe_keys)

  probe_rows = batch[mask]
  probe_count = len(probe_rows)

  # assign rating to movie_id, user_id pairs
  if probe_count > 0:
    for _, row in probe_rows.iterrows():
      movie_id = row['movie_id']
      user_id = row['user_id']
      probe_pairs[(movie_id, user_id)] = row['rating']

  # filter out training rows using mask
  train_batch = batch[~mask].drop(columns=['temp_key'])
  train_count = len(train_batch)

  # saving training rows to parquet
  if train_count > 0:
    table = pa.Table.from_pandas(train_batch)

    if os.path.exists(output):

      # Append if pq file already exists
      with pq.ParquetWriter(output,
        table.schema,
        compression='zstd'
      ) as writer:
        writer.write_table(table)

    else:
      # otherwise make new with compression
      train_batch.to_parquet(output, compression='zstd', engine='pyarrow')

  return (train_count, probe_count)

def process_parquet(df, probe_pairs, output):
  """
  Process parquet file. Get grouth truth ratings for each probe pair
  Remove probe datapoints from training data

  Args
    df: path to Parquet file
    probe_pairs: (movie_id, user_id) pairs
    output: path to save processed data

  Returns
    tuple: (training_count, probe_count)
  """
  probe_count = 0
  train_count = 0

  #batch_size = 500000
  first_batch = True

  if isinstance(df, str):
    pq_file = pq.ParquetFile(df)
    total_row = pq_file.num_row_groups

    for group in tqdm(range(total_row)):
      #print(f'Processing row group {group+1}/{total_row}')

      batch = pq_file.read_row_group(group).to_pandas()

      batch_train_count, batch_probe_count = process_batch(batch, probe_pairs, output,
                                                           append=not first_batch)

      train_count += batch_train_count
      probe_count += batch_probe_count
      first_batch = False

  else:
    raise TypeError("df must be path to a Parquet file")

  return train_count, probe_count

def create_test_df(probe_pairs, output):
  """
  Create test dataframe from probe_pairs

  Args
    probe_pairs: dict with (movie_id, user_id) as key and rating as value
    output: path to save processed data

  Returns
    test_df: probe entries + rating
  """

  test_data = []
  none_count = 0

  for (movie_id, user_id), rating in probe_pairs.items():
    if rating is not None:
      test_data.append({'movie_id': movie_id,
                        'user_id': user_id,
                        'rating': rating})
    else:
      none_count += 1

  test_df = pd.DataFrame(test_data)
  print(f"Created test set with {len(test_df):,} entries")

  test_df.to_parquet(output, compression='zstd', engine='pyarrow')

  return none_count



In [61]:
def process_netflix(dfs, probe_path, save_dir):
  """
  Process netflix data. Get grouth truth ratings for each probe pair
  Remove probe datapoints from training data

  Args
    dfs: list of dataframes of Netflix data
    probe_path: path to probe.txt
    save_dir: directory to save processed data

  Returns
    train_df: dataframes with probe entries removed
    test_df: probe entries + rating
  """

  os.makedirs(save_dir, exist_ok=True)

  # Parse probe into dictionary > faster lookup
  print("Parsing probe file")
  probe_dict = parse_probe(probe_path)
  print(f"Found {len(probe_dict)} probe pairs")


  # Extract ratings from probe entries and remove from training set
  train_files = []

  total_train = 0
  total_matches = 0

  for i, df_source in tqdm(enumerate(dfs)):

    # construct save path -> saving to multiple files
    train_path = os.path.join(save_dir, f'train_part_{i}.parquet')
    train_files.append(train_path)

    part_train_count, part_probe_count = process_parquet(df_source, probe_dict, train_path)

    total_train += part_train_count
    total_matches += part_probe_count

    print(f"Source {i+1} stats:")
    print(f"  - Added {part_train_count:,} ratings to training set")
    print(f"  - Found {part_probe_count:,} probe ratings")

  # Create test set from probe dict
  test_path = os.path.join(save_dir, 'test.parquet')
  none_count = create_test_df(probe_dict, test_path)
  print(f"Created test set with {none_count:,} missing ratings")

  print("\nProcessing complete!")
  print(f"Total training ratings: {total_train:,}")
  print(f"Total probe ratings found: {total_matches:,} out of {len(probe_dict):,}")

  return train_files, test_path

In [62]:
# PROCESSING

drive.mount('/gdrive')

working_dir = '/gdrive/MyDrive/Netflix_Prize'
os.chdir(working_dir)

dfs = [
    'combined_data_1.parquet'
    #,
    #'combined_data_2.parquet',
    #'combined_data_3.parquet',
    #'combined_data_4.parquet',
]

probe_path = 'probe.txt'
save_dir = 'processed'

Drive already mounted at /gdrive; to attempt to forcibly remount, call drive.mount("/gdrive", force_remount=True).


In [64]:
train_files, test_file = process_netflix(dfs, probe_path, save_dir)

print("\Complete")
print(f"Train files: {train_files}")
print(f"Test file: {test_file}")

Parsing probe file
Found 1408395 probe pairs


0it [00:00, ?it/s]

  0%|          | 0/23 [00:00<?, ?it/s]

Source 1 stats:
  - Added 23,476,551 ratings to training set
  - Found 577,213 probe ratings
Created test set with 577,213 entries
Created test set with 831,182 missing ratings

Processing complete!
Total training ratings: 23,476,551
Total probe ratings found: 577,213 out of 1,408,395
\Complete
Train files: ['processed/train_part_0.parquet']
Test file: processed/test.parquet
