In [2]:
import dask
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster
from dask import multiprocessing
import  numpy as np
import pandas as pd
from dask import array as da

dask.config.set({"distributed.worker.memory.terminate": False})
dask.config.set({"distributed.scheduler.allowed-failures": 50})
outputFile="allCSV100mb.txt"
#parquetToRead="parquet100MBall_csv_files"

parquetToRead="parquet100MBall_csv_files"
cluster = LocalCluster(memory_limit='4GB')
#client = Client(processes=False)
#client = Client(cluster, asynchronous=True)
client = Client(cluster)
display(client)

data=dd.read_parquet(parquetToRead, names=["user", "item", "rating"],indexstr=False,gather_statistics=True)

#csvToRead="smallcsv/10MBratings_Toys_and_Games.csv"
#data=dd.read_csv(csvToRead, names=["item", "user", "rating"])
data

Perhaps you already have a cluster running?
Hosting the HTTP server on port 50388 instead


0,1
Client  Scheduler: tcp://127.0.0.1:50391  Dashboard: http://127.0.0.1:50388/status,Cluster  Workers: 3  Cores: 6  Memory: 12.00 GB


Unnamed: 0_level_0,item,user,rating
npartitions=51,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
,int64,int64,float64
,...,...,...
...,...,...,...
,...,...,...
,...,...,...


In [3]:
# Get len of users and item
# +1 couse index start at 0
%time userLen,itemLen =dd.compute(data.user.max()+1,data.item.max()+1)
print(userLen)
print(itemLen)

Wall time: 46 s
43531850
15167257


In [4]:
# create latent factor matrixes for user and item
features=40

user_lf=da.full((userLen,features),0.1).compute()
item_lf=da.full((itemLen,features),0.1).compute()
print(user_lf)
print(item_lf)

[[0.1 0.1 0.1 ... 0.1 0.1 0.1]
 [0.1 0.1 0.1 ... 0.1 0.1 0.1]
 [0.1 0.1 0.1 ... 0.1 0.1 0.1]
 ...
 [0.1 0.1 0.1 ... 0.1 0.1 0.1]
 [0.1 0.1 0.1 ... 0.1 0.1 0.1]
 [0.1 0.1 0.1 ... 0.1 0.1 0.1]]
[[0.1 0.1 0.1 ... 0.1 0.1 0.1]
 [0.1 0.1 0.1 ... 0.1 0.1 0.1]
 [0.1 0.1 0.1 ... 0.1 0.1 0.1]
 ...
 [0.1 0.1 0.1 ... 0.1 0.1 0.1]
 [0.1 0.1 0.1 ... 0.1 0.1 0.1]
 [0.1 0.1 0.1 ... 0.1 0.1 0.1]]


In [5]:
#Compute global averages

%time globalItemMean = data.groupby("item").rating.mean().mean().compute()
%time globalUserMean = data.groupby("user").rating.mean().mean().compute()
%time globalRatingMean = data.rating.mean().compute()
print("global user mean: {}".format(globalUserMean))
print("global item mean: {}".format(globalItemMean))
print("global rating: {}".format(globalRatingMean))

Wall time: 1min 45s
Wall time: 3min 45s
Wall time: 29.8 s
global user mean: 4.075073563511038
global item mean: 4.181384720569861
global rating: 4.232190208636595


In [6]:
k=25
def computeUserAverageOffset(row):
    return globalItemMean-(((k*globalUserMean)+row['rating']['mean'])/(row['rating']['count']+k))

def computeItemAverage(row):
    return globalUserMean-((k*globalItemMean)+row['rating']['mean'])/(row['rating']['count']+k)

In [7]:
%%time
userOffset=data.groupby('user')\
                    .agg({'rating': ['mean', 'count']})\
                    .map_partitions(computeUserAverageOffset, meta=("betterMean",float)\
                    )\
                    .compute()
itemOffset=data.groupby('item')\
                    .agg({'rating': ['mean', 'count']})\
                    .map_partitions(computeItemAverage, meta=("betterMean",float))\
                    .compute()
print(userOffset)
print(itemOffset)

user
0           1.706213
1           0.364355
2           1.764327
3           1.712673
4           1.752366
              ...   
43531845    0.070737
43531846    0.371131
43531847    0.070737
43531848    0.070737
43531849    0.070737
Name: betterMean, Length: 43531850, dtype: float64
item
0          -0.137796
1           0.163123
2          -0.137796
3           3.359665
4           3.470024
              ...   
15167252    3.678200
15167253    3.854606
15167254    3.211409
15167255    3.859777
15167256    0.018236
Name: betterMean, Length: 15167257, dtype: float64
Wall time: 2min 25s


In [8]:
%%time
bias_user=da.from_array(userOffset).compute()
bias_item=da.from_array(itemOffset).compute()

Wall time: 4.99 s


In [9]:
from numba import njit
from dask import delayed
# https://github.com/gbolmier/funk-svd/blob/master/funk_svd/fast_methods.py

def run_epoch(X, bu, bi, pu, qi, global_mean, n_factors, lr, reg,new_bu,new_bi,new_pu,new_qi):
    """Runs an epoch, updating model weights (pu, qi, bu, bi).
    Parameters
    ----------
    X : numpy.array
        Training set.
    bu : numpy.array
        User biases vector.
    bi : numpy.array
        Item biases vector.
    pu : numpy.array
        User latent factors matrix.
    qi : numpy.array
        Item latent factors matrix.
    global_mean : float
        Ratings arithmetic mean.
    n_factors : int
        Number of latent factors.
    lr : float
        Learning rate.
    reg : float
        L2 regularization factor.
    Returns:
    --------
    bu : numpy.array
        User biases vector.
    bi : numpy.array
        Item biases vector.
    pu : numpy.array
        User latent factors matrix.
    qi : numpy.array
        Item latent factors matrix.
    """
    for i in range(X.shape[0]):
        user, item, rating = int(X[i, 0]), int(X[i, 1]), X[i, 2]

        # Predict current rating
        pred = global_mean + bu[user] + bi[item]

        for factor in range(n_factors):
            pred += pu[user, factor] * qi[item, factor]

        err = rating - pred

        # Update biases
        new_bu[user] += lr * (err - reg * bu[user])
        new_bi[item] += lr * (err - reg * bi[item])


        # Update latent factors
        for factor in range(n_factors):
            puf = pu[user, factor]
            qif = qi[item, factor]

            new_pu[user, factor] += lr * (err * qif - reg * puf)
            new_qi[item, factor] += lr * (err * puf - reg * qif)
    return np.array([new_bu, new_bi, new_pu, new_qi],dtype=object)[None]

In [10]:
def compute_val_metrics(X_val, bu, bi, pu, qi, global_mean, n_factors,residuals):
    """Computes validation metrics (loss, rmse, and mae).
    Parameters
    ----------
    X_val : numpy.array
        Validation set.
    bu : numpy.array
        User biases vector.
    bi : numpy.array
        Item biases vector.
    pu : numpy.array
        User latent factors matrix.
    qi : numpy.array
        Item latent factors matrix.
    global_mean : float
        Ratings arithmetic mean.
    n_factors : int
        Number of latent factors.
    Returns
    -------
    loss, rmse, mae : tuple of floats
        Validation loss, rmse and mae.
    """
    #residuals =np.arange(X_val.shape[0])

    for i in range(X_val.shape[0]):
        user, item, rating = int(X_val[i, 0]), int(X_val[i, 1]), X_val[i, 2]
        pred = global_mean

        if user > -1:
            pred += bu[user]

        if item > -1:
            pred += bi[item]

        if (user > -1) and (item > -1):
            for factor in range(n_factors):
                pred += pu[user, factor] * qi[item, factor]

        residuals[i]=(rating - pred)
   
    return residuals[None]

In [11]:
import dask_ml
print(len(data.index))
train_data, test_data=dask_ml.model_selection.train_test_split(\
                                                data.to_dask_array(lengths=True),
                                                test_size=0.3)
print(train_data)
print(test_data)
train_data

233055327
dask.array<concatenate, shape=(163138706, 3), dtype=float64, chunksize=(3634424, 3), chunktype=numpy.ndarray>
dask.array<concatenate, shape=(69916621, 3), dtype=float64, chunksize=(1557611, 3), chunktype=numpy.ndarray>


Unnamed: 0,Array,Chunk
Bytes,3.92 GB,87.23 MB
Shape,"(163138706, 3)","(3634424, 3)"
Count,408 Tasks,51 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 3.92 GB 87.23 MB Shape (163138706, 3) (3634424, 3) Count 408 Tasks 51 Chunks Type float64 numpy.ndarray",3  163138706,

Unnamed: 0,Array,Chunk
Bytes,3.92 GB,87.23 MB
Shape,"(163138706, 3)","(3634424, 3)"
Count,408 Tasks,51 Chunks
Type,float64,numpy.ndarray


In [12]:
max_chunk_size=max(test_data.compute_chunk_sizes().chunks[0])
print(max_chunk_size)

1557611


In [12]:

%%time
import time
epochs=40
lrate = 0.001
#L2 regularization factor / weight decay
reg=0.01
user_lf=np.array(user_lf)
item_lf=np.array(item_lf)
for i in range(epochs):
    start_time = time.time()
    remote_user_lf=client.scatter(user_lf)
    remote_item_lf=client.scatter(item_lf)
    res = da.map_blocks(\
                                        run_epoch,\
                                        train_data,\
                                        bias_user, \
                                        bias_item,\
                                        remote_user_lf,\
                                        remote_item_lf,\
                                        globalRatingMean,\
                                        features,\
                                        lrate,\
                                        reg,\
                                        np.zeros(shape=bias_user.shape),\
                                        np.zeros(shape=bias_item.shape),\
                                        np.zeros(shape=user_lf.shape),\
                                        np.zeros(shape=item_lf.shape),\
                                        dtype=object)\
                                    .compute()
    new_bu=res[:,0]
    new_bi=res[:,1]
    new_pu=res[:,2]
    new_qi=res[:,3]
    bias_user += sum(new_bu)/len(new_bu)
    bias_item += sum(new_bi)/len(new_bi)
    user_lf += sum(new_pu)/len(new_pu)
    item_lf += sum(new_qi)/len(new_qi)
    end_time=time.time()
    print("computing results")
    result_time = time.time()
    residuals= da.map_blocks(compute_val_metrics,test_data,bias_user,bias_item,user_lf,item_lf,globalRatingMean,features,np.full(max_chunk_size, fill_value=np.nan),dtype=object).compute()
    residuals=np.ma.array(residuals, mask=np.isnan(residuals))
    loss = np.square(residuals.mean())
    rmse = np.sqrt(loss)
    mae = np.absolute(residuals.mean())
    t=result_time-start_time
    print("epoch:{}\t\tloss:{}\t\trmse:{}\t\tmae:{}\t\ttrain time{}\t\teval time{}".format(i,loss,rmse,mae,t,time.time()-result_time))
    file_object = open(outputFile, 'a')
    file_object.write("epoch:{}\t\tloss:{}\t\trmse:{}\t\tmae:{}\t\ttrain time{}\t\teval time{}\n".format(i,loss,rmse,mae,t,time.time()-result_time))
    file_object.close()

computing results
epoch:0		loss:4.939606219525349		rmse:2.222522490218119		mae:2.222522490218119		train time45.62499976158142		eval time4.800001859664917


In [None]:
epoch:0		loss:38.12679057918392		rmse:6.1746895127758386		mae:3.832001188894558		time370.52927470207214