In [36]:
import os
from glob import glob

import dask
import dask.bag as db
import dask.dataframe as dd
from distributed import Client
from dask_jobqueue import SLURMCluster

from IPython.display import display
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

In [2]:
from lightfm import LightFM

In [3]:
from lightfm.data import Dataset

In [23]:
LOCAL = False

if LOCAL:
    # This line creates a single-machine dask client
    client = Client()
else:    
    # This line creates a SLURM cluster dask and dask client
    # Logging outputs will be stored in /scratch/{your-netid}
    
    cluster = SLURMCluster(memory='4GB', cores=2, python='/scratch/work/public/dask/bin/python', 
                               local_directory='/tmp/{}/'.format(os.environ['SLURM_JOB_USER']),
                               job_extra=['--output=/scratch/{}/slurm-%j.out'.format(os.environ['SLURM_JOB_USER'])])

    cluster.submit_command = 'slurm'
    cluster.scale(100)

    display(cluster)
    client = Client(cluster)

display(client)

Perhaps you already have a cluster running?
Hosting the HTTP server on port 36243 instead


VBox(children=(HTML(value='<h2>SLURMCluster</h2>'), HBox(children=(HTML(value='\n<div>\n  <style scoped>\n    …

0,1
Client  Scheduler: tcp://10.32.33.35:42453  Dashboard: http://10.32.33.35:36243/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


In [24]:
#read in modified data
#train_val = dd.read_csv("../train_val_small.csv")
#val_modified = dd.read_csv("../val_modified_small.csv")
train_val = dd.read_csv("../train_val_large.csv")
val_modified = dd.read_csv("../val_modified_large.csv")

In [25]:
train_val = train_val.repartition(npartitions=100)
val_modified = val_modified.repartition(npartitions=100)

In [26]:
#modify data to input into lightfm dataset
train = train_val[['userId', 'movieId', 'rating']]
train_bag = train.to_bag()
train_movie = train['movieId']
train_user = train['userId']
train_movie_bag = train_movie.to_bag().distinct()
train_user_bag = train_user.to_bag().distinct()

In [27]:
#modify data to input into lightfm dataset
val = val_modified[['userId', 'movieId', 'rating']]
val_bag = val.to_bag()

In [28]:
#create lightfm dataset
dataset = Dataset()

In [29]:
#fit and build interactions
dataset.fit(train_user_bag, train_movie_bag)

In [30]:
(train_interactions, train_weights) = dataset.build_interactions(train_bag)

In [31]:
(val_interactions, val_weights) = dataset.build_interactions(val_bag)

In [30]:
#check that the size of the matrix is the same
train_weights

<137103x32192 sparse matrix of type '<class 'numpy.float32'>'
	with 8254944 stored elements in COOrdinate format>

In [31]:
val_weights

<137103x32192 sparse matrix of type '<class 'numpy.float32'>'
	with 5511292 stored elements in COOrdinate format>

In [19]:
#perform parameter tuning here
#small_model = LightFM(loss='warp', no_components=10, item_alpha=0.02, user_alpha=0.02)

In [32]:
large_model = LightFM(loss='warp', no_components=50, item_alpha=0.05, user_alpha=0.05)

In [20]:
#fit the small model
#%time small_model.fit(train_weights, epochs=20, num_threads=2)

CPU times: user 255 ms, sys: 2.56 ms, total: 258 ms
Wall time: 273 ms


<lightfm.lightfm.LightFM at 0x146f728323d0>

In [33]:
#fit the large model
%time large_model.fit(train_weights, epochs=20, num_threads=10)

CPU times: user 7min 14s, sys: 4.46 s, total: 7min 19s
Wall time: 7min 32s


<lightfm.lightfm.LightFM at 0x146f71cff580>

In [34]:
from lightfm.evaluation import precision_at_k

In [21]:
#use precision at k for parameter tuning
#print("Val precision: %.5f" % precision_at_k(small_model, val_interactions, k=100).mean())

Val precision: 0.09295


In [35]:
print("Val precision: %.5f" % precision_at_k(large_model, val_interactions, k=100).mean())

Val precision: 0.06739
