# Glint-FMPair evaluation on Lastfm-1k

In [1]:
import os
os.environ["PYSPARK_SUBMIT_ARGS"] = "--conf spark.driver.memory=10G " + \
    "--conf spark.driver.extraJavaOptions=-XX:+UseG1GC " + \
    "--conf spark.driverEnv.LD_PRELOAD=/opt/cloudera/parcels/mkl/linux/mkl/lib/intel64/libmkl_rt.so " + \
    "--conf spark.driverEnv.MKL_VERBOSE=0 " + \
    "--jars glint-fmpair/target/scala-2.11/glint-fmpair-assembly-1.0.jar pyspark-shell"

import pickle
import time

from math import log2, ceil

from pyspark.ml.linalg import VectorUDT
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import collect_set, udf, col, mean, first
from pyspark.sql import SparkSession

from glintfmpair import WeightHotEncoderEstimator, WeightHotEncoderModel, GlintFMPair, GlintFMPairModel

In [2]:
spark = SparkSession.builder \
    .appName("Glint-FMPair evaluation on Lastfm-1k") \
    .master("yarn") \
    .config("spark.submit.deployMode","client") \
    .config("spark.executor.memory", "40G") \
    .config("spark.executor.instances", "10") \
    .config("spark.executor.cores", "10") \
    .config("spark.executorEnv.LD_PRELOAD",
            "/opt/cloudera/parcels/mkl/linux/mkl/lib/intel64/libmkl_rt.so") \
    .config("spark.executorEnv.MKL_VERBOSE", "0") \
    .config("spark.executor.extraJavaOptions", "-XX:+UseG1GC") \
    .config("spark.jars", "glint-fmpair/target/scala-2.11/glint-fmpair-assembly-1.0.jar") \
    .getOrCreate()

In [3]:
traindataPath = "Lastfm-1k-train.csv"
valqueryseedsdataPath = "Lastfm-1k-val-queryseeds.csv"
valqueryctxdataPath = "Lastfm-1k-val-queryctx.csv"
valdataPath = "Lastfm-1k-val.csv"

In [4]:
encoderModelPaths = ["Lastfm-1k-usercoldstartencoder.model", "Lastfm-1k-userencoder.model",
                     "Lastfm-1k-itemencoder.model", "Lastfm-1k-ctxitemencoder.model"]

In [5]:
parameterServerHost="10.7.0.105"

In [6]:
loadEncoder=False
saveEncoder=False
loadGlintFMPair=True
saveGlintFMPair=True

## Data loading and preprocessing

In [7]:
def loadData(s, dataPath): 
    return s.read.format("csv").option("header", "true").option("inferSchema", "true").load(dataPath) \
        .repartition(100, "pid")


def loadEncoderModels(modelPaths):
    return tuple(WeightHotEncoderModel.load(modelPath) for modelPath in modelPaths)


def fitEncoderModels(data):
    
    userColdStartGroupCols = ["pid"]
    userColdStartCols = ["traid"]
    userColdStartWeights = [1.0]
    userColdStartEncoderModel = WeightHotEncoderEstimator(
        weights=userColdStartWeights, inputCols=userColdStartCols,
        outputCols=[c + "_cold_encoded" for c in userColdStartCols],
        groupCols=userColdStartGroupCols, groupWeighting="sqrt"
    ).fit(data)
    
    # dropLast, missing user features are simply ignored, no problem for ranking
    userCols = ["pid", "b_age", "country", "gender"]
    userWeights = [1.0, 1.0, 1.0, 1.0]
    userEncoderModel = WeightHotEncoderEstimator(
        weights=userWeights, inputCols=userCols,
        outputCols=[c + "_encoded" for c in userCols], dropLast=True
    ).fit(data)
    
    # missing item features are mapped to the missing feature index
    itemCols = ["traid", "artid"]
    itemWeights = [1.0, 1.0]
    itemEncoderModel = WeightHotEncoderEstimator(
        weights=itemWeights, inputCols=itemCols,
        outputCols=[c + "_encoded" for c in itemCols], dropLast=False
    ).fit(data)

    # dropLast, missing user features are simply ignored, no problem for ranking
    ctxitemCols = ["prev_traid", "prev_artid"]
    ctxitemWeights = itemWeights
    ctxitemEncoderModel = itemEncoderModel.copy({
        itemEncoderModel.weights: ctxitemWeights,
        itemEncoderModel.inputCols: ctxitemCols,
        itemEncoderModel.outputCols: [c + "_encoded" for c in ctxitemCols],
        itemEncoderModel.dropLast: True,
        itemEncoderModel.handleInvalid: "keep"
    })
    
    return userColdStartEncoderModel, userEncoderModel, itemEncoderModel, ctxitemEncoderModel


def resizeVector(sizeVector, resizeVector):
    resizeVector.size = sizeVector.size
    return resizeVector


resizeVectorUdf = udf(resizeVector, VectorUDT())
    
def toFeatures(data, models, queryctxData=None, sharedItemFeatures=False,
               allUserColdStartWeights=[1.0],
               userColdStartWeighting="sqrt",
               allUserWeights=[1.0, 1.0, 1.0, 1.0],
               allCtxitemWeights=[1.0, 1.0],
               allItemWeights=[1.0, 1.0],
               coldStartUserCols=[],
               userCols=["pid", "b_age", "country", "gender"],
               ctxitemCols=["prev_traid", "prev_artid"],
               itemCols=["traid", "artid"]):
    
    userColdStartEncoderModel, userEncoderModel, itemEncoderModel, ctxitemEncoderModel = models
    
    encodedColdStartUserCols = [c + "_cold_encoded" for c in coldStartUserCols]
    encodedUserCols = [c + "_encoded" for c in userCols]
    encodedItemCols = [c + "_encoded" for c in itemCols]
    encodedCtxitemCols = [c + "_encoded" for c in ctxitemCols]
    
    if queryctxData:
        data = queryctxData
    else:
        data = itemEncoderModel.setWeights(allItemWeights).transform(data)
    
    # hot encode
    if coldStartUserCols:
        data = userColdStartEncoderModel \
            .setWeights(allUserColdStartWeights) \
            .setGroupWeighting(userColdStartWeighting) \
            .transform(data)
    
    if userCols: 
        data = userEncoderModel.setWeights(allUserWeights).transform(data)
        
    if ctxitemCols:
        data = ctxitemEncoderModel.setWeights(allCtxitemWeights).transform(data)

    # assemble feature vectors of required hot-encoded columns into combined feature vectors
    userInputCols = encodedCtxitemCols + encodedColdStartUserCols + encodedUserCols
    userAssembler = VectorAssembler(inputCols=userInputCols, outputCol="userctxfeatures")
    data = userAssembler.transform(data)

    if not queryctxData:
        itemInputCols = encodedItemCols
        itemAssembler = VectorAssembler(inputCols=itemInputCols, outputCol="itemfeatures")
        data = itemAssembler.transform(data)

    # rename columns to Glint-FMPair names
    cols = [col("userctxfeatures"), col("pid").cast("int").alias("userid")]
    
    if not queryctxData and sharedItemFeatures:
        cols += [resizeVectorUdf(col("userctxfeatures"), col("itemfeatures")).alias("itemfeatures"), 
                 col("traid").cast("int").alias("itemid")]
    elif not queryctxData:
        cols += [col("itemfeatures"), col("traid").cast("int").alias("itemid")]
    
    if "albid" in itemCols:
        cols += [col("albid").cast("int")]
    
    if "artid" in itemCols:
        cols += [col("artid").cast("int")]
    
    return data.select(*cols)

In [8]:
traindata = loadData(spark, traindataPath)

valdata = loadData(spark, valdataPath) \
    .select(col("pid").cast("int").alias("userid"), col("traid").cast("int").alias("itemid"))

valqueryseedsdata = loadData(spark, valqueryseedsdataPath)
valqueryctxdata = loadData(spark, valqueryctxdataPath)

if loadEncoder:
    encoderModels = loadEncoderModels(encoderModelPaths) 
else:
    encoderModels = fitEncoderModels(traindata)
    if saveEncoder:
        for model, modelPath in zip(encoderModels, encoderModelPaths):
            model.save(modelPath)

## Evaluation metrics

In [9]:
def dcg(itemid, recs):
    return sum(1.0 / log2(i + 2) if rec["itemid"] == itemid else 0.0
               for i, rec in enumerate(recs))

def hitRate(itemid, recs):
    return 1.0 if recs[0]["itemid"] == itemid else 0


dcgUdf = udf(dcg)
hitRateUdf = udf(hitRate)


def evaluateModel(queryfeatures, evaldata, model):
    recdata = model.recommendForUserSubset(queryfeatures, 500).join(evaldata, "userid")
    scores = recdata \
        .select(hitRateUdf("itemid", "recommendations").alias("hitRate"), \
                dcgUdf("itemid", "recommendations").alias("dcg")) \
        .select(mean(col("hitRate")), mean(col("dcg"))) \
        .first()
    
    hitRate = scores[0]
    ndcg = scores[1]
    return hitRate, ndcg

## Evaluation

In [10]:
trainFeatures = toFeatures(traindata, encoderModels,
                           userCols=["pid"], itemCols=["traid"], ctxitemCols=[])
valqueryFeatures = toFeatures(valqueryseedsdata, encoderModels,
                              queryctxData=valqueryctxdata,
                              userCols=["pid"], ctxitemCols=[])

modelPath = "Lastfm-1k.model"
if loadGlintFMPair:
    model = GlintFMPairModel.load(modelPath, parameterServerHost=parameterServerHost)
    with open(modelPath, "rb") as f:
        fitTime = pickle.load(f)
else:
    fitStart = time.time()
    model = GlintFMPair(
        batchSize=256, stepSize=0.6, linearReg=0.03, factorsReg=0.001, numDims=150,
        parameterServerHost=parameterServerHost, numParameterServers=3, maxIter=10,
        samplingCol="", sampler="crossbatch"
    ).fit(trainFeatures)
    fitEnd = time.time()
    fitTime = fitEnd - fitStart
    
    if saveGlintFMPair:
        model.save(modelPath)
        with open(modelPath, "wb") as f:
            pickle.dump(fitTime, f)
            
hitRate, ndcg = evaluateModel(valqueryFeatures, valdata, model)
model.destroy()
print("{:.5f}s fit time\n{:.6f} hit rate\n{:.6f} NDCG".format(fitTime, hitRate, ndcg))

852.88596s fit time
0.002020 hit rate
0.043559 NDCG


In [13]:
trainFeatures = toFeatures(traindata, encoderModels)
valqueryFeatures = toFeatures(valqueryseedsdata, encoderModels,
                              queryctxData=valqueryctxdata)

modelPath = "Lastfm-1k-prevall.model"
if loadGlintFMPair:
    model = GlintFMPairModel.load(modelPath, parameterServerHost=parameterServerHost)
    with open(modelPath, "rb") as f:
        fitTime = pickle.load(f)
else:
    fitStart = time.time()
    model = GlintFMPair(
        batchSize=256, stepSize=0.6, linearReg=0.03, factorsReg=0.001, numDims=150,
        parameterServerHost=parameterServerHost, numParameterServers=3, maxIter=10,
        samplingCol="", sampler="crossbatch"
    ).fit(trainFeatures)
    fitEnd = time.time()
    fitTime = fitEnd - fitStart
    
    if saveGlintFMPair:
        model.save(modelPath)
        with open(modelPath, "wb") as f:
            pickle.dump(fitTime, f)
            
hitRate, ndcg = evaluateModel(valqueryFeatures, valdata, model)
model.destroy()
print("{:.5f}s fit time\n{:.6f} hit rate\n{:.6f} NDCG".format(fitTime, hitRate, ndcg))

990.84414s fit time
0.031313 hit rate
0.145785 NDCG


Another run where the data is cached and preprocessing not counted towards fit time.

In [10]:
trainFeatures = toFeatures(traindata, encoderModels).coalesce(100).cache()
valqueryFeatures = toFeatures(valqueryseedsdata, encoderModels,
                              queryctxData=valqueryctxdata)

trainFeatures.count()

fitStart = time.time()
model = GlintFMPair(
    batchSize=256, stepSize=0.6, linearReg=0.03, factorsReg=0.001, numDims=150,
    parameterServerHost=parameterServerHost, numParameterServers=3, maxIter=10,
    samplingCol="", sampler="crossbatch"
).fit(trainFeatures)
fitEnd = time.time()
fitTime = fitEnd - fitStart
            
hitRate, ndcg = evaluateModel(valqueryFeatures, valdata, model)
model.destroy()
print("{:.5f}s fit time\n{:.6f} hit rate\n{:.6f} NDCG".format(fitTime, hitRate, ndcg))

970.72021s fit time
0.032323 hit rate
0.148859 NDCG
