In [1]:
import os
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.mllib.feature import Word2Vec
from pyspark.ml.linalg import Vectors
import random
from collections import defaultdict
import numpy as np
from pyspark.sql import functions as F





In [2]:
class UdfFunction:
    @staticmethod
    def sortF(movie_list, timestamp_list):
        """
        sort by time and return the corresponding movie sequence
        eg:
            input: movie_list:[1,2,3]
                   timestamp_list:[1112486027,1212546032,1012486033]
            return [3,1,2]
        """
        pairs = []
        for m, t in zip(movie_list, timestamp_list):
            pairs.append((m, t))
        # sort by time
        pairs = sorted(pairs, key=lambda x: x[1])
        return [x[0] for x in pairs]

In [3]:
def processItemSequence(spark, rawSampleDataPath):
    # rating data
    ratingSamples = spark.read.format("csv").option("header", "true").load(rawSampleDataPath)
    # ratingSamples.show(5)
    # ratingSamples.printSchema()
    sortUdf = udf(UdfFunction.sortF, ArrayType(StringType()))
    userSeq = ratingSamples \
        .where(F.col("rating") >= 3.5) \
        .groupBy("userId") \
        .agg(sortUdf(F.collect_list("movieId"), F.collect_list("timestamp")).alias('movieIds')) \
        .withColumn("movieIdStr", array_join(F.col("movieIds"), " "))
    # userSeq.select("userId", "movieIdStr").show(10, truncate = False)
    return userSeq.select('movieIdStr').rdd.map(lambda x: x[0].split(' '))


In [4]:
def embeddingLSH(spark, movieEmbMap):
    movieEmbSeq = []
    for key, embedding_list in movieEmbMap.items():
        embedding_list = [np.float64(embedding) for embedding in embedding_list]
        movieEmbSeq.append((key, Vectors.dense(embedding_list)))
    movieEmbDF = spark.createDataFrame(movieEmbSeq).toDF("movieId", "emb")
    bucketProjectionLSH = BucketedRandomProjectionLSH(inputCol="emb", outputCol="bucketId", bucketLength=0.1,
                                                      numHashTables=3)
    bucketModel = bucketProjectionLSH.fit(movieEmbDF)
    embBucketResult = bucketModel.transform(movieEmbDF)
    print("movieId, emb, bucketId schema:")
    embBucketResult.printSchema()
    print("movieId, emb, bucketId data result:")
    embBucketResult.show(10, truncate=False)
    print("Approximately searching for 5 nearest neighbors of the sample embedding:")
    sampleEmb = Vectors.dense(0.795, 0.583, 1.120, 0.850, 0.174, -0.839, -0.0633, 0.249, 0.673, -0.237)
    bucketModel.approxNearestNeighbors(movieEmbDF, sampleEmb, 5).show(truncate=False)



In [5]:
def trainItem2vec(spark, samples, embLength, embOutputPath, saveToRedis, redisKeyPrefix):
    word2vec = Word2Vec().setVectorSize(embLength).setWindowSize(5).setNumIterations(10)
    model = word2vec.fit(samples)
    synonyms = model.findSynonyms("158", 20)
    for synonym, cosineSimilarity in synonyms:
        print(synonym, cosineSimilarity)
    embOutputDir = '/'.join(embOutputPath.split('/')[:-1])
    if not os.path.exists(embOutputDir):
        os.makedirs(embOutputDir)
    with open(embOutputPath, 'w') as f:
        for movie_id in model.getVectors():
            vectors = " ".join([str(emb) for emb in model.getVectors()[movie_id]])
            f.write(movie_id + ":" + vectors + "\n")
    embeddingLSH(spark, model.getVectors())
    return model

In [8]:
conf = SparkConf().setAppName('ctrModel').setMaster('local')
spark = SparkSession.builder.config(conf=conf).getOrCreate()
# Change to your own filepath
rawSampleDataPath = "/Users/hui/Desktop/python/recommend/SparrowRecSys-master/src/main/resources/webroot/sampledata/ratings.csv"
embLength = 10
samples = processItemSequence(spark, rawSampleDataPath)
model = trainItem2vec(spark, samples, embLength,
                      embOutputPath="./item2vecEmb.csv", saveToRedis=False,
                      redisKeyPrefix="i2vEmb")



48 0.9518048167228699
256 0.9424139857292175
31 0.9222530126571655
186 0.9123157858848572
168 0.8697343468666077
277 0.8647581934928894
252 0.8588904738426208
355 0.8574140667915344
432 0.8364532589912415
552 0.8358733057975769
276 0.8271687626838684
520 0.8269667625427246
44 0.8114587068557739
236 0.7913941740989685
455 0.7826753258705139
2 0.7816562652587891
169 0.7601582407951355
333 0.7466580271720886
362 0.74569171667099
204 0.7431633472442627
movieId, emb, bucketId schema:
root
 |-- movieId: string (nullable = true)
 |-- emb: vector (nullable = true)
 |-- bucketId: array (nullable = true)
 |    |-- element: vector (containsNull = true)

movieId, emb, bucketId data result:
+-------+-----------------------------------------------------------------------------------------------------------------------+--------------------------+
|movieId|emb                                                                                                                    |bucketId                  |

In [10]:
def generate_pair(x):
    # eg:
    # watch sequence:['858', '50', '593', '457']
    # return:[['858', '50'],['50', '593'],['593', '457']]
    pairSeq = []
    previousItem = ''
    for item in x:
        if not previousItem:
            previousItem = item
        else:
            pairSeq.append((previousItem, item))
            previousItem = item
    return pairSeq

def generateTransitionMatrix(samples):
    pairSamples = samples.flatMap(lambda x: generate_pair(x))
    pairCountMap = pairSamples.countByValue()
    pairTotalCount = 0
    transitionCountMatrix = defaultdict(dict)
    itemCountMap = defaultdict(int)
    for key, cnt in pairCountMap.items():
        key1, key2 = key
        transitionCountMatrix[key1][key2] = cnt
        itemCountMap[key1] += cnt
        pairTotalCount += cnt
    transitionMatrix = defaultdict(dict)
    itemDistribution = defaultdict(dict)
    for key1, transitionMap in transitionCountMatrix.items():
        for key2, cnt in transitionMap.items():
            transitionMatrix[key1][key2] = transitionCountMatrix[key1][key2] / itemCountMap[key1]
    for itemid, cnt in itemCountMap.items():
        itemDistribution[itemid] = cnt / pairTotalCount
    return transitionMatrix, itemDistribution
def randomWalk(transitionMatrix, itemDistribution, sampleCount, sampleLength):
    samples = []
    for i in range(sampleCount):
        samples.append(oneRandomWalk(transitionMatrix, itemDistribution, sampleLength))
    return samples

def graphEmb(samples, spark, embLength, embOutputFilename, saveToRedis, redisKeyPrefix):
    transitionMatrix, itemDistribution = generateTransitionMatrix(samples)
    sampleCount = 20000
    sampleLength = 10
    newSamples = randomWalk(transitionMatrix, itemDistribution, sampleCount, sampleLength)
    rddSamples = spark.sparkContext.parallelize(newSamples)
    trainItem2vec(spark, rddSamples, embLength, embOutputFilename, saveToRedis, redisKeyPrefix)

In [13]:
pairSamples = samples.flatMap(lambda x: generate_pair(x))
pairCountMap = pairSamples.countByValue()


In [15]:
pairCountMap

defaultdict(int,
            {('858', '50'): 465,
             ('50', '593'): 395,
             ('593', '457'): 419,
             ('1', '25'): 114,
             ('25', '32'): 137,
             ('32', '6'): 72,
             ('6', '608'): 80,
             ('608', '52'): 101,
             ('52', '58'): 93,
             ('58', '26'): 3,
             ('26', '30'): 3,
             ('30', '103'): 6,
             ('103', '582'): 1,
             ('582', '588'): 1,
             ('661', '107'): 18,
             ('107', '60'): 15,
             ('60', '1'): 7,
             ('1', '919'): 63,
             ('919', '223'): 34,
             ('223', '260'): 64,
             ('260', '899'): 24,
             ('899', '480'): 5,
             ('480', '592'): 261,
             ('592', '593'): 69,
             ('593', '356'): 698,
             ('356', '588'): 99,
             ('588', '344'): 95,
             ('344', '47'): 102,
             ('47', '595'): 65,
             ('595', '736'): 61,
             ('736'

In [16]:
def oneRandomWalk(transitionMatrix, itemDistribution, sampleLength):
    sample = []
    # pick the first element
    randomDouble = random.random()
    firstItem = ""
    accumulateProb = 0.0
    for item, prob in itemDistribution.items():
        accumulateProb += prob
        if accumulateProb >= randomDouble:
            firstItem = item
            break
    sample.append(firstItem)
    curElement = firstItem
    i = 1
    while i < sampleLength:
        if (curElement not in itemDistribution) or (curElement not in transitionMatrix):
            break
        probDistribution = transitionMatrix[curElement]
        randomDouble = random.random()
        accumulateProb = 0.0
        for item, prob in probDistribution.items():
            accumulateProb += prob
            if accumulateProb >= randomDouble:
                curElement = item
                break
        sample.append(curElement)
        i += 1
    return sample

In [17]:
 graphEmb(samples, spark, embLength, embOutputFilename="./itemGraphEmb.csv",
             saveToRedis=True, redisKeyPrefix="graphEmb")

256 0.9711096286773682
168 0.9370803833007812
552 0.9160218834877014
186 0.8890082240104675
368 0.88346266746521
204 0.8780500888824463
31 0.8772533535957336
48 0.8678412437438965
520 0.8446921706199646
277 0.8331523537635803
485 0.8256796598434448
145 0.8253970146179199
252 0.8242735266685486
315 0.7838448286056519
333 0.7714435458183289
163 0.7713625431060791
442 0.7652636170387268
355 0.7623554468154907
353 0.75705486536026
370 0.7491781115531921
movieId, emb, bucketId schema:
root
 |-- movieId: string (nullable = true)
 |-- emb: vector (nullable = true)
 |-- bucketId: array (nullable = true)
 |    |-- element: vector (containsNull = true)

movieId, emb, bucketId data result:
+-------+-------------------------------------------------------------------------------------------------------------------------+-----------------------+
|movieId|emb                                                                                                                      |bucketId               |
