In [None]:
import numpy as np
import pandas as pd
import dask
import dask.dataframe as dd
from dask.distributed import Client
import ctypes
import time

## Start Dask Scheduler and Workers

In [None]:
client = Client()
client.restart()

## Read CSV

In [None]:
user_reviews_ddf = dd.read_csv("user_reviews_trimmed.csv")

In [None]:
user_reviews_ddf

In [None]:
user_reviews_ddf.visualize()

## Mean computation

In [None]:
mean_graph = user_reviews_ddf["overall"].mean()

In [None]:
mean_graph.visualize()

In [None]:
result = mean_graph.compute()

In [None]:
print(f"mean of 'overall' attribute is {result}")

## Implicit compute() for len(), head() etc

In [None]:
user_reviews_ddf.head(10)

## Inspecting Individual Partitions

In [None]:
num_partitions = user_reviews_ddf.npartitions
print(f"Total number of partitions = {num_partitions}")

In [None]:
partition1 = user_reviews_ddf.partitions[0].compute()
partition1

In [None]:
user_reviews_repart_ddf = user_reviews_ddf.repartition(npartitions=10)  
user_reviews_repart_ddf.npartitions

## Groupby

In [None]:
# get mean rating of every product 
product_means = user_reviews_ddf.groupby("asin").overall.mean()
product_means.npartitions

In [None]:
# Groupby defaults to 1 output partition! Can get problematic if we have many groups and 'product_means' does not fit in RAM!
product_means.npartitions

In [None]:
# product_means.visualize()

## Map-Partition

In [None]:
out = user_reviews_ddf.map_partitions(lambda df: df["overall"]*5, meta=pd.Series(dtype=float))
# out = user_reviews_ddf["overall"]*5

In [None]:
out

## Calling Compute On Related Operations Allows for Task Sharing

In [None]:
s1 = user_reviews_ddf.groupby("asin").overall.sum()
s2 = user_reviews_ddf.groupby("asin").overall.mean()

out = dd.compute(s1,s2)

In [None]:
print(out)