In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import dask.dataframe as dd
import dask.bag as db

from distributed import Client
from dask_jobqueue import SLURMCluster
from IPython.display import display

import os
from glob import glob

In [2]:
# Set LOCAL to True for single-machine execution while developing
# Set LOCAL to False for cluster execution
LOCAL = False

if LOCAL:
    # This line creates a single-machine dask client
    client = Client()
else:    
    # This line creates a SLURM cluster dask and dask client
    # Logging outputs will be stored in /scratch/{your-netid}
    
    cluster = SLURMCluster(memory='256GB', cores=8, python='/scratch/work/public/dask/bin/python', 
                               local_directory='/tmp/{}/'.format(os.environ['SLURM_JOB_USER']),
                               job_extra=['--output=/scratch/{}/slurm-%j.out'.format(os.environ['SLURM_JOB_USER'])])

    cluster.submit_command = 'slurm'
    cluster.scale(100)

    display(cluster)
    client = Client(cluster)

display(client)

VBox(children=(HTML(value='<h2>SLURMCluster</h2>'), HBox(children=(HTML(value='\n<div>\n  <style scoped>\n    …

0,1
Client  Scheduler: tcp://10.32.35.22:37993  Dashboard: http://10.32.35.22:8787/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


In [3]:
os.chdir('/scratch/work/courses/DSGA1004-2021/movielens/ml-latest-small')

In [4]:
movies_df = dd.read_csv('movies.csv')
movies_df = movies_df.repartition(npartitions=500)

ratings_df = dd.read_csv('ratings.csv')
#ratings_df = ratings_df.repartition(npartitions=ratings_df.npartitions // 100)

links_df = dd.read_csv('links.csv')
#links_df = links_df.repartition(npartitions=links_df.npartitions // 100)

tags_df = dd.read_csv('tags.csv')
#tags_df = tags_df.repartition(npartitions=tags_df.npartitions // 100)

In [5]:
print("This dataset contains {} rows and {} columns".format(len(movies_df), movies_df.shape[1]))
movies_df.head()

This dataset contains 9742 rows and 3 columns


Unnamed: 0,movieId,title,genres
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,2,Jumanji (1995),Adventure|Children|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance
3,4,Waiting to Exhale (1995),Comedy|Drama|Romance
4,5,Father of the Bride Part II (1995),Comedy


In [6]:
ratings_df.head()

Unnamed: 0,userId,movieId,rating,timestamp
0,1,1,4.0,964982703
1,1,3,4.0,964981247
2,1,6,4.0,964982224
3,1,47,5.0,964983815
4,1,50,5.0,964982931


In [7]:
links_df.head()

Unnamed: 0,movieId,imdbId,tmdbId
0,1,114709,862.0
1,2,113497,8844.0
2,3,113228,15602.0
3,4,114885,31357.0
4,5,113041,11862.0


In [8]:
tags_df.head()

Unnamed: 0,userId,movieId,tag,timestamp
0,2,60756,funny,1445714994
1,2,60756,Highly quotable,1445714996
2,2,60756,will ferrell,1445714992
3,2,89774,Boxing story,1445715207
4,2,89774,MMA,1445715200


In [9]:
%%time
# Compute the utility matrix
ratings_df['movieId'] = ratings_df['movieId'].astype('category').cat.as_known().cat.as_ordered()
ratings_df['userId'] = ratings_df['userId'].astype('category').cat.as_known().cat.as_ordered()
utility_matrix = dd.pivot_table(ratings_df, values='rating', index='userId', columns='movieId')
utility_matrix.columns = utility_matrix.columns.as_ordered()

CPU times: user 1.91 s, sys: 26.1 ms, total: 1.94 s
Wall time: 2.03 s


Compute the average rating for each movie, sorted by highest to lowest rating.

In [10]:
%%time
mean_ratings = utility_matrix.mean(axis=0)

CPU times: user 2.29 s, sys: 9.29 ms, total: 2.3 s
Wall time: 2.3 s


In [11]:
%%time
mean_ratings = mean_ratings.compute()

CPU times: user 858 ms, sys: 141 ms, total: 999 ms
Wall time: 1min 35s


In [18]:
mean_ratings = mean_ratings.sort_values(ascending=False)
print(mean_ratings[0:5000])

movieId
88448     5.000
143511    5.000
6201      5.000
102217    5.000
102084    5.000
          ...  
2057      3.375
2693      3.375
102686    3.375
102407    3.375
4062      3.375
Length: 5000, dtype: float64
