In [2]:
import json
import ctypes
from dask.distributed import Client
import dask.dataframe as dd

def trim_memory() -> int:
    libc = ctypes.CDLL("libc.so.6")
    return libc.malloc_trim(0)


In [3]:
client = Client()
# Helps fix any memory leaks.
client.run(trim_memory)
client = client.restart()



In [4]:
user_reviews_ddf = dd.read_csv('demo_data.csv')
user_reviews_ddf

Unnamed: 0_level_0,reviewerID,asin,reviewerName,helpful,reviewText,overall,summary,unixReviewTime,reviewTime
npartitions=55,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
,object,object,object,object,object,float64,object,float64,object
,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...


In [5]:
num_partitions = user_reviews_ddf.npartitions
print(f"Total number of partitions = {num_partitions}")

Total number of partitions = 55


In [6]:
partition1 = user_reviews_ddf.partitions[0].compute()
partition1

Unnamed: 0,reviewerID,asin,reviewerName,helpful,reviewText,overall,summary,unixReviewTime,reviewTime
0,A2T0RJ91B0PQ03,B0016CRVLW,Gerald DeWitt,"[0, 0]",Beware! This is NOT the original single versi...,1.0,Poor Quality Alternate Take,1.400630e+09,"05 21, 2014"
1,A3TYW0XA8HSGWB,B00EKR5S0Q,Linda E. Larson,"[0, 0]",This is my new most favorite k-cup coffee. I c...,5.0,Vanilla Starbucks K-cups,1.398557e+09,"04 27, 2014"
2,A2CME0TQU2IVVB,B001AUPJVO,L5Momma,"[1, 1]",This headset is great! It worked in our 2007 ...,5.0,Awesome!,1.355875e+09,"12 19, 2012"
3,A2E5IDLX7R388S,B000055Y57,Jeff Andersen,"[0, 0]",Scofield is one of my favorite musicians and i...,5.0,Straight ahead Jazz with the Scofield twist,1.402358e+09,"06 10, 2014"
4,A3CIEMYUGV6ZMR,0545265355,Adroit,"[0, 0]","Wonderful book! I cried, well teared up at a f...",5.0,Greatest Book Ever!!!,1.334102e+09,"04 11, 2012"
...,...,...,...,...,...,...,...,...,...
111952,A1N2ZAC86P26BF,6303823351,David,"[1, 1]",Help! is probably my favorite of the Beatles m...,4.0,The best of the Beatles films,9.982656e+08,"08 20, 2001"
111953,AUFN1J7VJZL83,B002OHE20G,Amanda Banks,"[0, 0]",This heater has worked out very well for a sma...,5.0,"eliable, SAFE Heat",1.402099e+09,"06 7, 2014"
111954,AGZK126DNQ2FN,1401340970,"Cy B. Hilterman ""Cy. Hilterman""","[2, 2]",As a person that has made many trips to Niagar...,5.0,Romance and adventure in the Niagara Falls area,1.250035e+09,"08 12, 2009"
111955,A1LA51JOIGGD45,1400071550,E.A. West,"[0, 0]",The battle between good and evil continues in ...,5.0,Heroic battle between good and evil,1.366589e+09,"04 22, 2013"


In [7]:
# Apply Python function on each DataFrame partition.
# Here we apply a function with arguments and keywords to a DataFrame, resulting in a Series.

def myadd(df, a, b=1):
    return df.overall + a + b

res = user_reviews_ddf.map_partitions(myadd, 1, b=0)

In [8]:
res.compute()

0         2.0
1         6.0
2         6.0
3         6.0
4         6.0
         ... 
112342    6.0
112343    6.0
112344    6.0
112345    6.0
112346    5.0
Name: overall, Length: 6158168, dtype: float64

In [9]:

avg_ratings = user_reviews_ddf.groupby('reviewerID', sort=False).overall.mean(split_out=40)
avg_ratings.npartitions


40

In [None]:
res_rating = avg_ratings.compute()
res_rating

In [None]:
number_of_products_per_reviewer = user_reviews_ddf.groupby('reviewerID',sort=False)['asin'].nunique(split_out=40)
number_of_products_per_reviewer.npartitions

In [None]:
res_num_pro = number_of_products_per_reviewer.compute()
res_num_pro

In [None]:

# Assuming user_reviews_ddf is your Dask DataFrame and 'reviewDate' is the column with dates in 'MM DD, YYYY' format
# Convert 'reviewDate' to datetime
user_reviews_ddf['reviewTime'] = dd.to_datetime(user_reviews_ddf['reviewTime'], format='%m %d, %Y')

# Extract the year
user_reviews_ddf['reviewYear'] = user_reviews_ddf['reviewTime'].dt.year

# Group by 'reviewerID' and find the earliest 'reviewYear'
reviewing_since = user_reviews_ddf.groupby('reviewerID')['reviewYear'].min(split_out=40)


In [None]:
res_reviewing_since = reviewing_since.compute()
res_reviewing_since

In [None]:

# Assuming user_reviews_ddf is your Dask DataFrame
# First, extract the number of helpful votes from the 'helpful' column
user_reviews_ddf['helpful_votes'] = user_reviews_ddf['helpful'].apply(lambda x: eval(x)[0], meta=('x', 'int'))

# Then, group by 'reviewerID' and sum the 'helpful_votes'
total_helpful_votes = user_reviews_ddf.groupby('reviewerID')['helpful_votes'].sum().compute()

In [None]:


# Assuming user_reviews_ddf is your Dask DataFrame
# First, extract the number of helpful votes from the 'helpful' column
user_reviews_ddf['total_votes'] = user_reviews_ddf['helpful'].apply(lambda x: eval(x)[1], meta=('x', 'int'))

# Then, group by 'reviewerID' and sum the 'helpful_votes'
total_votes = user_reviews_ddf.groupby('reviewerID')['total_votes'].sum().compute()

In [None]:

# Convert each Series into a Dask DataFrame
df_number_products_rated = number_of_products_per_reviewer.to_frame(name='number_products_rated')
df_avg_ratings = avg_ratings.to_frame(name='avg_ratings')
df_reviewing_since = reviewing_since.to_frame(name='reviewing_since')
df_helpful_votes = total_helpful_votes.to_frame(name='helpful_votes')
df_total_votes = total_votes.to_frame(name='total_votes')

# Concatenate these DataFrames along the columns
users_ddf = dd.concat([df_number_products_rated, df_avg_ratings, df_reviewing_since, df_helpful_votes, df_total_votes], axis=1)