# Finalize reducing of the reviewer data

In [1]:
import pandas as pd
from tqdm import tqdm
import pickle
pd.options.mode.chained_assignment = None  # default='warn' # this is needed because setting temporary column value on chunk
import multiprocessing
import numpy as np
cpu_count = multiprocessing.cpu_count() # 16 cores
cpu_count

16

In [2]:
# 183M rows after the initial map-reduce
num_rows = 183095823
chunksize = 1000000

In [3]:
# previously saved the set of unique reviewer ID's
with open('./reviewer_ids', 'rb') as f:
    reviewer_id_full_set = pickle.load(f)

In [4]:
# main bottleneck is keeping all 40m reviewer information in memory
# therefore need to go through, and just consolidate one quarter at a time
reviewer_id_chunks = np.array_split(list(reviewer_id_full_set), 4)

In [5]:
columns = ['reviewerID','1*','2*','3*','4*','5*']
# initialize empty csv, which will be populated
empty_df = pd.DataFrame(columns=[columns])
empty_df.to_csv("./Intermediate_Datasets/consolidated_reviewer_data.csv", index=False)

Reducing was definitely the most computationally expensive part of the data processing. Because there are ~43m reviewers, it is difficult to keep all of them in memory, while going through the mapped-reviews and updating the source reviewers accordingly.

The workaround I did was to split the reviewer id's into splits (reviewer_id_chunks), and process only process one split at a time. This way, only ~10m rows of reviewers needs to be kept in memory at any given point.

Multiprocessing was used to further decrease the run time. Each core handled a different portion of the reviewer_id_chunk. I did this because I noticed that updating large pandas dataframes is very slow.

This entire process took ~3 hours.

In [6]:
# both df's essentially represent reviewer_df's in the same form
# just need to use the chunk_df to update the persisted_df, according to reviewer_id
def worker(chunk_df, persisted_df):
    # first do one more reduction, prior to iterating and updating storage df's
    chunk_df = chunk_df.groupby('reviewerID').sum(numeric_only=True).reset_index()
    persisted_df = pd.concat([chunk_df,persisted_df]).groupby('reviewerID').sum().reset_index()
    return persisted_df

In [7]:
for reviewer_id_chunk_idx, reviewer_id_chunk in enumerate(reviewer_id_chunks):
    # this block essentially goes through the entire dataset, and looks for information related
    # to this reviewer_id_chunk. these rows are then consolidated together in memory (reviewer_df_list)
    # at the end of going through the entire dataset, append it to the .csv file, then move onto
    # the next reviewer_id_chunk
    reviewer_id_cpu_splits = np.array_split(list(reviewer_id_chunk), cpu_count)
    group_sets = {}
    
    for index, reviewer_id_list in enumerate(reviewer_id_cpu_splits):
        group_sets[index] = set(reviewer_id_list)

    reviewer_df_list = []
    for group_id in group_sets:
        reviewer_df_list.append(pd.DataFrame(columns=columns))

    tqdm._instances.clear()

    with tqdm(total=num_rows) as pbar:
        pbar.set_description(f"Q{reviewer_id_chunk_idx}")
        for chunk in pd.read_csv(
            "./Intermediate_Datasets/reviewer_data_non_reduced.csv",
            chunksize=chunksize
        ):      
            # define tasks to send to processes
            task_list = []
            for group_id, group_set in group_sets.items():
                relevant_df = chunk.loc[chunk['reviewerID'].isin(group_set)]
                task_list.append((
                    relevant_df,
                    reviewer_df_list[group_id]
                ))

            pool = multiprocessing.Pool(processes = cpu_count)
            reviewer_df_list = pool.starmap(worker, task_list)
            pool.close()
            pbar.update(chunksize) 
    
    # combine the list of review_df_list, and commit it to the csv. then move onto the next reviewer_id_chunk
    combined_reviewer_df = pd.concat(reviewer_df_list, axis=0)
    combined_reviewer_df.to_csv("./Intermediate_Datasets/consolidated_reviewer_data.csv", mode="a", header=False)

Q0: : 184000000it [50:00, 61331.24it/s]                                                                        
Q1: : 184000000it [51:59, 58974.97it/s]                                                                        
Q2: : 184000000it [43:37, 70295.53it/s]                                                                        
Q3: : 184000000it [43:30, 70487.54it/s]                                                                        
