# Comparing Pandas-based and Dask-based Computations

This notebook corresponds to [my blog post on benjlindsay.com](http://benjlindsay.com/blog/analyzing-larger-than-memory-data-on-your-laptop). The purpose of this notebook is to show that Pandas-based manipulations and calculations can be scaled up to larger-than-memory datasets without too much of a change using Dask.

In [1]:
import numpy as np
import dask.bag as db
import json
import pandas as pd

## 1. Define Function to Compute Baseline Prediction that works with either Pandas or Dask Dataframes

In [2]:
def compute_baseline_rmse(df_train, df_test, beta_u, beta_i, rmse_func):
    """
    df_train and df_test are either Pandas or Dask dataframes
    that must contain the columns 'user_id', 'business_id', and 'stars'.
    beta_u and beta_i are user and business damping factors, respectively.
    rmse_func is a function that computes the RMSE of the prediction
    and takes Pandas or Dask Series objects, depending on whether
    df_train and df_test are Pandas or Dask Dataframes.
    """
    # Get mean rating of all training ratings
    train_mean = df_train['stars'].mean()
    # Get dataframe of b_u part of baseline for each user id
    user_group = df_train[['user_id', 'stars']].groupby('user_id')
    df_train_user = user_group.agg(['sum', 'count'])['stars']
    df_train_user['b_u'] = (df_train_user['sum'] - train_mean * df_train_user['count'])
    df_train_user['b_u'] /= (df_train_user['count'] + beta_u)
    # Create column of b_u values corresponding to the user who made the review
    df_train = df_train.join(df_train_user[['b_u']], on='user_id')
    # Add column representing the expression inside the summation part of the b_i equation
    df_train['b_i_sum'] = df_train['stars'] - df_train['b_u'] - train_mean
    # Average over each business to get the actual b_i values for each business
    bus_group = df_train[['business_id', 'b_i_sum']].groupby('business_id')
    df_train_bus = bus_group.agg(['sum', 'count'])['b_i_sum'].rename(columns={'sum': 'b_i'})
    df_train_bus['b_i'] /= df_train_bus['count'] + beta_i
    # Join b_u and b_i columns to test dataframe
    df_test = df_test.join(df_train_user[['b_u']], on='user_id').fillna(df_train_user['b_u'].mean())
    df_test = df_test.join(df_train_bus[['b_i']], on='business_id').fillna(df_train_bus['b_i'].mean())
    # Predict and Compute error
    df_test['pred'] = df_test['b_u'] + df_test['b_i'] + train_mean
    error = rmse_func(df_test['stars'], df_test['pred'])
    print('Error = {}'.format(error))

## 2. Just Champaign Training Data

### 2.1 Load Reviews Data with Pandas

In [3]:
df_rev_champaign = pd.read_json('../preprocessed-data/champaign_reviews_train.json', orient='records', lines=True)
df_rev_champaign = df_rev_champaign[['review_id', 'business_id', 'user_id', 'stars']]
df_rev_champaign.head(3)

Unnamed: 0,review_id,business_id,user_id,stars
0,3xGR24wD5ILntyX2UXZWTA,1DedueD53YsKcpqMWPIe9w,OkZk0I2S6mcMOtjSP12U_A,3
1,2_BvxFBvtyMKjNf3gzmbqw,1DedueD53YsKcpqMWPIe9w,8f9m9EdA6M5Jr-sqdPrc5A,4
2,V926hjwKcbT-ZVJOwSeXnQ,1DedueD53YsKcpqMWPIe9w,oJl-C8UECsibhHS2dB8yzQ,2


### 2.2 Define Root Mean Squared Error (RMSE) Function that works with Pandas Series objects

In [4]:
def rmse_pandas(y_true, y_pred):
    diff_sq = (y_true - y_pred) ** 2
    return np.sqrt(diff_sq.mean())

### 2.3 Split Reviews Dataframe into Train and Test

In [5]:
from sklearn.model_selection import train_test_split
df_train_champaign, df_test_champaign = train_test_split(df_rev_champaign, random_state=0, test_size=0.2)

### 2.4 Compute Baseline RMSE

In [6]:
compute_baseline_rmse(df_train_champaign, df_test_champaign, 5, 5, rmse_pandas)

Error = 1.2719370215819148


In [7]:
# Delete previous dataframes
del df_rev_champaign, df_train_champaign, df_test_champaign

## 3. US/Canada Reviews

### 3.1 Load Reviews Data with Dask

In [8]:
dict_bag = db.read_text('../preprocessed-data/reviews_train.json', blocksize=int(50e5)).map(json.loads)
df_rev = dict_bag.to_dataframe(columns=['review_id', 'business_id', 'user_id', 'stars'])
df_rev = df_rev.repartition(npartitions=10)
df_rev.head(3)

Unnamed: 0,review_id,business_id,user_id,stars
0,NxL8SIC5yqOdnlXCg18IBg,2aFiy99vNLklCx3T_tGS9A,KpkOkG6RIf4Ra25Lhhxf1A,5
1,pXbbIgOXvLuTi_SPs1hQEQ,2aFiy99vNLklCx3T_tGS9A,bQ7fQq1otn9hKX-gXRsrgA,5
2,GP6YEearUWrzPtQYSF1vVg,2LfIuF3_sX6uwe-IR-P0jQ,aW3ix1KNZAvoM8q-WghA3Q,5


### 3.2 Define RMSE Function that works with Dask Series Objects

In [9]:
def rmse_dask(y_true, y_pred):
    diff_sq = (y_true - y_pred) ** 2
    return np.sqrt(diff_sq.mean().compute())

### 3.3 Split Reviews Dataframe into Train and Test

In [10]:
df_train, df_test = df_rev.random_split([0.8, 0.2], random_state=0)

### 3.4 Compute Baseline RMSE

In [11]:
%%time
compute_baseline_rmse(df_train, df_test, 5, 5, rmse_dask)

Error = 1.2458789659023641
CPU times: user 7min 41s, sys: 1min 5s, total: 8min 46s
Wall time: 10min 19s
