In [1]:
import findspark
findspark.init()
from pyspark import SparkConf
from pyspark.sql import SparkSession
conf = SparkConf().setAppName('featureEngineering').setMaster('local')
spark = SparkSession.builder.config(conf=conf).getOrCreate()

# Feature Engineering

In [2]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, QuantileDiscretizer, MinMaxScaler
from pyspark.ml.linalg import VectorUDT, Vectors
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as F


def oneHotEncoderExample(movieSamples):
    # Add movieIdNumber column with intger type with the same value as movieId
    samplesWithIdNumber = movieSamples.withColumn("movieIdNumber", F.col("movieId").cast(IntegerType()))
    encoder = OneHotEncoder(inputCols=["movieIdNumber"], outputCols=['movieIdVector'], dropLast=False)
    oneHotEncoderSamples = encoder.fit(samplesWithIdNumber).transform(samplesWithIdNumber)
    oneHotEncoderSamples.printSchema()
    oneHotEncoderSamples.show(10)


def array2vec(genreIndexes, indexSize):
    genreIndexes.sort()
    fill_list = [1.0 for _ in range(len(genreIndexes))]
    return Vectors.sparse(indexSize, genreIndexes, fill_list)


def multiHotEncoderExample(movieSamples):
    # 把genre的多category explode
    # movieId|               title|    genre|
    #     +-------+--------------------+---------+
    #     |      1|    Toy Story (1995)|Adventure|
    #     |      1|    Toy Story (1995)|Animation|
    #     |      1|    Toy Story (1995)| Children|
    samplesWithGenre = movieSamples.select("movieId", "title", explode(
        split(F.col("genres"), "\\|").cast(ArrayType(StringType()))).alias('genre'))
    genreIndexer = StringIndexer(inputCol="genre", outputCol="genreIndex")
    StringIndexerModel = genreIndexer.fit(samplesWithGenre)
    # 把Adventure, Animation变成数值i.e. 5, 7, 9
    genreIndexSamples = StringIndexerModel.transform(samplesWithGenre).withColumn("genreIndexInt",
                                                                                  F.col("genreIndex").cast(IntegerType()))
    indexSize = genreIndexSamples.agg(max(F.col("genreIndexInt"))).head()[0] + 1
    processedSamples = genreIndexSamples.groupBy('movieId').agg(
        F.collect_list('genreIndexInt').alias('genreIndexes')).withColumn("indexSize", F.lit(indexSize))
    finalSample = processedSamples.withColumn("vector",
                                              udf(array2vec, VectorUDT())(F.col("genreIndexes"), F.col("indexSize")))
    finalSample.printSchema()
    finalSample.show(10)


def ratingFeatures(ratingSamples):
    ratingSamples.printSchema()
    ratingSamples.show()
    # calculate average movie rating score and rating count
    movieFeatures = ratingSamples.groupBy('movieId').agg(F.count(F.lit(1)).alias('ratingCount'),
                                                         F.avg("rating").alias("avgRating"),
                                                         F.variance('rating').alias('ratingVar')) \
        .withColumn('avgRatingVec', udf(lambda x: Vectors.dense(x), VectorUDT())('avgRating'))
    movieFeatures.show(10)
    # bucketing
    ratingCountDiscretizer = QuantileDiscretizer(numBuckets=100, inputCol="ratingCount", outputCol="ratingCountBucket")
    # Normalization
    ratingScaler = MinMaxScaler(inputCol="avgRatingVec", outputCol="scaleAvgRating")
    pipelineStage = [ratingCountDiscretizer, ratingScaler]
    featurePipeline = Pipeline(stages=pipelineStage)
    movieProcessedFeatures = featurePipeline.fit(movieFeatures).transform(movieFeatures)
    movieProcessedFeatures.show(10)

In [3]:
# Read data and do feature engineering
file_path = '/Users/linchenxiao/Python_Code/Spark/SparrowRecsys'
movieResourcesPath = file_path + "/sampledata/movies.csv"
movieSamples = spark.read.format('csv').option('header', 'true').load(movieResourcesPath)
print("Raw Movie Samples:")
movieSamples.show(10)
movieSamples.printSchema()
print("OneHotEncoder Example:")
oneHotEncoderExample(movieSamples)
print("MultiHotEncoder Example:")
multiHotEncoderExample(movieSamples)
print("Numerical features Example:")
ratingsResourcesPath = file_path + "/sampledata/ratings.csv"
ratingSamples = spark.read.format('csv').option('header', 'true').load(ratingsResourcesPath)
ratingFeatures(ratingSamples)

Raw Movie Samples:
+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
+-------+--------------------+--------------------+
only showing top 10 rows

root
 |-- movieId: string (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)

OneHotEncoder Example:
root
 |-- movieId: string (nullable = true)
 |-- title: string (nullable = true)
 |-

# 如何使用Spark 生成Embedding

In [5]:
import os
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.mllib.feature import Word2Vec
from pyspark.ml.linalg import Vectors
import random
from collections import defaultdict
import numpy as np
from pyspark.sql import functions as F


class UdfFunction:
    @staticmethod
    def sortF(movie_list, timestamp_list):
        """
        sort by time and return the corresponding movie sequence
        eg:
            input: movie_list:[1,2,3]
                   timestamp_list:[1112486027,1212546032,1012486033]
            return [3,1,2]
        """
        pairs = []
        for m, t in zip(movie_list, timestamp_list):
            pairs.append((m, t))
        # sort by time
        pairs = sorted(pairs, key=lambda x: x[1])
        return [x[0] for x in pairs]


def processItemSequence(spark, rawSampleDataPath):
    """处理raw data生成sequence数据"""
    # rating data
    ratingSamples = spark.read.format("csv").option("header", "true").load(rawSampleDataPath)
    #     +------+-------+------+----------+
    #     |userId|movieId|rating| timestamp|
    #     +------+-------+------+----------+
    #     |     1|      2|   3.5|1112486027|
    #     |     1|     29|   3.5|1112484676|
    #     |     1|     32|   3.5|1112484819|
    # ratingSamples.show(5)
    # ratingSamples.printSchema()
    # 自己定义的sortF函数
    sortUdf = udf(UdfFunction.sortF, ArrayType(StringType()))
    # 对每个user生成sequences
    userSeq = ratingSamples \
        .where(F.col("rating") >= 3.5) \
        .groupBy("userId") \
        .agg(sortUdf(F.collect_list("movieId"), F.collect_list("timestamp")).alias('movieIds')) \
        .withColumn("movieIdStr", array_join(F.col("movieIds"), " ")) # 把所有id连成一个String，方便后续word2vec模型处理
    # userSeq.select("userId", "movieIdStr").show(10, truncate = False)
    # 把序列数据筛选出来，丢掉其他过程数据
    # 最后输出 List[List[item]]
    # ['858', '50', '593', '457'],
    # ['1', '25', '32', '6', '608', '52', '58', '26', '30', '103', '582', '588'],
    return userSeq.select('movieIdStr').rdd.map(lambda x: x[0].split(' '))


def embeddingLSH(spark, movieEmbMap):
    movieEmbSeq = []
    for key, embedding_list in movieEmbMap.items():
        embedding_list = [np.float64(embedding) for embedding in embedding_list]
        movieEmbSeq.append((key, Vectors.dense(embedding_list)))
    movieEmbDF = spark.createDataFrame(movieEmbSeq).toDF("movieId", "emb")
    bucketProjectionLSH = BucketedRandomProjectionLSH(inputCol="emb", outputCol="bucketId", bucketLength=0.1,
                                                      numHashTables=3)
    bucketModel = bucketProjectionLSH.fit(movieEmbDF)
    embBucketResult = bucketModel.transform(movieEmbDF)
    print("movieId, emb, bucketId schema:")
    embBucketResult.printSchema()
    print("movieId, emb, bucketId data result:")
    embBucketResult.show(10, truncate=False)
    print("Approximately searching for 5 nearest neighbors of the sample embedding:")
    sampleEmb = Vectors.dense(0.795, 0.583, 1.120, 0.850, 0.174, -0.839, -0.0633, 0.249, 0.673, -0.237)
    bucketModel.approxNearestNeighbors(movieEmbDF, sampleEmb, 5).show(truncate=False)


def trainItem2vec(spark, samples, embLength, embOutputPath, saveToRedis, redisKeyPrefix):
    word2vec = Word2Vec().setVectorSize(embLength).setWindowSize(5).setNumIterations(10)
    model = word2vec.fit(samples)
    # 查找物品158最近的20个物品
    synonyms = model.findSynonyms("158", 20)
    for synonym, cosineSimilarity in synonyms:
        print(synonym, cosineSimilarity)
    embOutputDir = '/'.join(embOutputPath.split('/')[:-1])
    if not os.path.exists(embOutputDir):
        os.makedirs(embOutputDir)
    with open(embOutputPath, 'w') as f:
        # getVectors(): {item1: embeddings1, item2: embeddings2}
        for movie_id in model.getVectors():
            vectors = " ".join([str(emb) for emb in model.getVectors()[movie_id]])
            f.write(movie_id + ":" + vectors + "\n")
    embeddingLSH(spark, model.getVectors())
    return model


def generate_pair(x):
    # eg:
    # watch sequence:['858', '50', '593', '457']
    # return:[['858', '50'],['50', '593'],['593', '457']]
    pairSeq = []
    previousItem = ''
    for item in x:
        if not previousItem:
            previousItem = item
        else:
            pairSeq.append((previousItem, item))
            previousItem = item
    return pairSeq


def generateTransitionMatrix(samples):
    """生成物品转移矩阵"""
    # 通过flatMap将观影序列打碎成一个一个影片对，按照时间排列
    pairSamples = samples.flatMap(lambda x: generate_pair(x))
    # count每个影片对的数量，用作weight
    # i.e. {('858', '50'): 465,('50', '593'): 395,('593', '457'): 419}
    pairCountMap = pairSamples.countByValue()
    # 一共有多少个pair
    pairTotalCount = 0
    # 二维dict表示从物品1到物品2的次数， 从物品858到物品50一共465次
    # i.e. '858': {'50': 465, '527': 445, '356': 105, '750': 154, '593': 195}
    transitionCountMatrix = defaultdict(dict)
    # Item作为起点一共出现多少次 {'858': 7395,'50': 8769,'593': 11216}
    itemCountMap = defaultdict(int)
    for key, cnt in pairCountMap.items():
        key1, key2 = key
        transitionCountMatrix[key1][key2] = cnt
        itemCountMap[key1] += cnt
        pairTotalCount += cnt
    # 二维dict表示从物品1到物品2的概率， 从物品858到物品50的概率是0.0628
    # {'858': {'50': 0.06288032454361055,'527': 0.06017579445571332,'356': 0.014198782961460446}
    transitionMatrix = defaultdict(dict)
    # Item作为起点的比例是多少
    itemDistribution = defaultdict(dict)
    for key1, transitionMap in transitionCountMatrix.items():
        for key2, cnt in transitionMap.items():
            transitionMatrix[key1][key2] = transitionCountMatrix[key1][key2] / itemCountMap[key1]
    for itemid, cnt in itemCountMap.items():
        itemDistribution[itemid] = cnt / pairTotalCount
    return transitionMatrix, itemDistribution


def oneRandomWalk(transitionMatrix, itemDistribution, sampleLength):
    sample = []
    # pick the first element
    # 根据物品出现次数分布随机选择一个起始物品
    randomDouble = random.random()
    firstItem = ""
    accumulateProb = 0.0
    for item, prob in itemDistribution.items():
        accumulateProb += prob
        if accumulateProb >= randomDouble:
            firstItem = item
            break
    sample.append(firstItem)
    curElement = firstItem
    i = 1
    # 不停的游走直到sampleLength
    while i < sampleLength:
        if (curElement not in itemDistribution) or (curElement not in transitionMatrix):
            break
        # 对curElement来说的游走概率
        probDistribution = transitionMatrix[curElement]
        randomDouble = random.random()
        accumulateProb = 0.0
        for item, prob in probDistribution.items():
            accumulateProb += prob
            if accumulateProb >= randomDouble:
                curElement = item
                break
        sample.append(curElement)
        i += 1
    return sample


def randomWalk(transitionMatrix, itemDistribution, sampleCount, sampleLength):
    samples = []
    for i in range(sampleCount):
        samples.append(oneRandomWalk(transitionMatrix, itemDistribution, sampleLength))
    return samples


def graphEmb(samples, spark, embLength, embOutputFilename, saveToRedis, redisKeyPrefix):
    #生成物品转移概率矩阵
    transitionMatrix, itemDistribution = generateTransitionMatrix(samples)
    sampleCount = 20000
    sampleLength = 10
    newSamples = randomWalk(transitionMatrix, itemDistribution, sampleCount, sampleLength)
    rddSamples = spark.sparkContext.parallelize(newSamples)
    trainItem2vec(spark, rddSamples, embLength, embOutputFilename, saveToRedis, redisKeyPrefix)


def generateUserEmb(spark, rawSampleDataPath, model, embLength, embOutputPath, saveToRedis, redisKeyPrefix):
    ratingSamples = spark.read.format("csv").option("header", "true").load(rawSampleDataPath)
    Vectors_list = []
    for key, value in model.getVectors().items():
        Vectors_list.append((key, list(value)))
    fields = [
        StructField('movieId', StringType(), False),
        StructField('emb', ArrayType(FloatType()), False)
    ]
    schema = StructType(fields)
    Vectors_df = spark.createDataFrame(Vectors_list, schema=schema)
    ratingSamples = ratingSamples.join(Vectors_df, on='movieId', how='inner')
    # 对每个user来说，把所有看过电影的embeddings加起来（map以每一行为输入，输出是（userId, embedding)， reduceByKey 把embedding的每个diemension加起来)
    # map的第一个参数就是key
    # foo = sc.parallelize([(1, ('a','b')), (2, ('c','d')), (1, ('x','y'))])
    # foo.map(lambda (x,y): (x, [y])).reduceByKey(lambda p,q: p+q).collect()
    # [(1, [('a', 'b'), ('x', 'y')]), (2, [('c', 'd')])]
    result = ratingSamples.select('userId', 'emb').rdd.map(lambda x: (x[0], x[1])) \
        .reduceByKey(lambda a, b: [a[i] + b[i] for i in range(len(a))]).collect()
    with open(embOutputPath, 'w') as f:
        for row in result:
            vectors = " ".join([str(emb) for emb in row[1]])
            f.write(row[0] + ":" + vectors + "\n")

In [6]:
# Change to your own filepath
# Item2Vec
file_path = '/Users/linchenxiao/Python_Code/Spark/SparrowRecsys'
rawSampleDataPath = file_path + "/sampledata/ratings.csv"
embLength = 10
samples = processItemSequence(spark, rawSampleDataPath)
model = trainItem2vec(spark, samples, embLength,
                      embOutputPath=file_path[7:] + "/modeldata2/item2vecEmb.csv", saveToRedis=False,
                      redisKeyPrefix="i2vEmb")

48 0.9580522179603577
256 0.9551143050193787
31 0.9187507629394531
186 0.8950363397598267
355 0.8806580901145935
168 0.8764886260032654
520 0.846748948097229
252 0.8462055921554565
276 0.842883288860321
277 0.8394060730934143
552 0.835608720779419
432 0.8306246995925903
44 0.8212148547172546
333 0.800767719745636
455 0.7899411916732788
169 0.7763938903808594
2 0.7761274576187134
236 0.7647883892059326
237 0.7625499963760376
370 0.7538554668426514




movieId, emb, bucketId schema:
root
 |-- movieId: string (nullable = true)
 |-- emb: vector (nullable = true)
 |-- bucketId: array (nullable = true)
 |    |-- element: vector (containsNull = true)

movieId, emb, bucketId data result:
+-------+-----------------------------------------------------------------------------------------------------------------------+------------------------+
|movieId|emb                                                                                                                    |bucketId                |
+-------+-----------------------------------------------------------------------------------------------------------------------+------------------------+
|710    |[-1.3764266,0.42805645,0.4078089,0.06357183,1.5329667,0.76017064,1.1100068,1.2193089,0.6592338,-1.0196304]             |[[22.0], [14.0], [18.0]]|
|205    |[-0.030348117,0.37577343,-0.27565956,-0.21543641,0.81982124,-0.32763448,-0.44681698,0.64313394,0.46586126,-0.124947704]|[[2.0], [0.0], [3

In [7]:
# Graph Embedding
graphEmb(samples, spark, embLength, embOutputFilename=file_path[7:] + "/modeldata2/itemGraphEmb.csv",
         saveToRedis=True, redisKeyPrefix="graphEmb")
generateUserEmb(spark, rawSampleDataPath, model, embLength,
                embOutputPath=file_path[7:] + "/modeldata2/userEmb.csv", saveToRedis=False,
                redisKeyPrefix="uEmb")

355 0.9262107014656067
552 0.9148286581039429
540 0.8829371929168701
256 0.8801469206809998
257 0.8549433350563049
60 0.8544407486915588
577 0.8227375745773315
277 0.8206712603569031
31 0.8096217513084412
520 0.7996008396148682
276 0.7969487309455872
48 0.795081615447998
170 0.7835773825645447
44 0.7819278240203857
543 0.7695910334587097
168 0.7638594508171082
303 0.7603774070739746
673 0.7584818601608276
585 0.756323516368866
419 0.7448134422302246
movieId, emb, bucketId schema:
root
 |-- movieId: string (nullable = true)
 |-- emb: vector (nullable = true)
 |-- bucketId: array (nullable = true)
 |    |-- element: vector (containsNull = true)

movieId, emb, bucketId data result:
+-------+---------------------------------------------------------------------------------------------------------------------+------------------------+
|movieId|emb                                                                                                                  |bucketId                |
+-----

# Feature Engineering for model

In [28]:
import pyspark.sql as sql
from pyspark.sql.functions import *
from pyspark.sql.types import *
from collections import defaultdict
from pyspark.sql import functions as F

NUMBER_PRECISION = 2


def addSampleLabel(ratingSamples):
    # 大于3.5的是positive
    ratingSamples.show(5, truncate=False)
    ratingSamples.printSchema()
    sampleCount = ratingSamples.count()
    ratingSamples.groupBy('rating').count().orderBy('rating').withColumn('percentage',
                                                                         F.col('count') / sampleCount).show()
    ratingSamples = ratingSamples.withColumn('label', when(F.col('rating') >= 3.5, 1).otherwise(0))
    return ratingSamples


def extractReleaseYearUdf(title):
    # add realease year
    if not title or len(title.strip()) < 6:
        return 1990
    else:
        yearStr = title.strip()[-5:-1]
    return int(yearStr)


def addMovieFeatures(movieSamples, ratingSamplesWithLabel):
    # add movie basic features
    samplesWithMovies1 = ratingSamplesWithLabel.join(movieSamples, on=['movieId'], how='left')
    # add releaseYear,title. Title example: Toy Story (1995)
    samplesWithMovies2 = samplesWithMovies1.withColumn('releaseYear',
                                                       udf(extractReleaseYearUdf, IntegerType())('title')) \
        .withColumn('title', udf(lambda x: x.strip()[:-6].strip(), StringType())('title')) \
        .drop('title')
    # split genres, 取三个genres
    samplesWithMovies3 = samplesWithMovies2.withColumn('movieGenre1', split(F.col('genres'), "\\|")[0]) \
        .withColumn('movieGenre2', split(F.col('genres'), "\\|")[1]) \
        .withColumn('movieGenre3', split(F.col('genres'), "\\|")[2])
    # add rating features movieRatingCount|movieAvgRating|movieRatingStddev
    movieRatingFeatures = samplesWithMovies3.groupBy('movieId').agg(F.count(F.lit(1)).alias('movieRatingCount'),
                                                                    format_number(F.avg(F.col('rating')),NUMBER_PRECISION).alias('movieAvgRating'),
                                                                    F.stddev(F.col('rating')).alias('movieRatingStddev')).fillna(0) \
        .withColumn('movieRatingStddev', format_number(F.col('movieRatingStddev'), NUMBER_PRECISION))
    # join movie rating features
    samplesWithMovies4 = samplesWithMovies3.join(movieRatingFeatures, on=['movieId'], how='left')
    samplesWithMovies4.printSchema()
    samplesWithMovies4.show(5, truncate=False)
    return samplesWithMovies4


def extractGenres(genres_list):
    '''
    pass in a list which format like ["Action|Adventure|Sci-Fi|Thriller", "Crime|Horror|Thriller"]
    count by each genre，return genre_list in reverse order
    eg:
    (('Thriller',2),('Action',1),('Sci-Fi',1),('Horror', 1), ('Adventure',1),('Crime',1))
    return:['Thriller','Action','Sci-Fi','Horror','Adventure','Crime']
    '''
    genres_dict = defaultdict(int)
    for genres in genres_list:
        for genre in genres.split('|'):
            genres_dict[genre] += 1
    sortedGenres = sorted(genres_dict.items(), key=lambda x: x[1], reverse=True)
    return [x[0] for x in sortedGenres]


def addUserFeatures(samplesWithMovieFeatures):
    extractGenresUdf = udf(extractGenres, ArrayType(StringType()))
    # Window function防止未来信息
    # userRatedMovie1, 对这个（用户，电影）来说，上一部看的电影
    # userGenres用户喜欢的电影类型，是一个List. i.e. [crime, drama]
    # userGenres1,2,3,4,5。没有那么多的时候用null补
    samplesWithUserFeatures = samplesWithMovieFeatures \
        .withColumn('userPositiveHistory',
                    F.collect_list(when(F.col('label') == 1, F.col('movieId')).otherwise(F.lit(None))).over(
                        sql.Window.partitionBy("userId").orderBy(F.col("timestamp")).rowsBetween(-100, -1))) \
        .withColumn("userPositiveHistory", reverse(F.col("userPositiveHistory"))) \
        .withColumn('userRatedMovie1', F.col('userPositiveHistory')[0]) \
        .withColumn('userRatedMovie2', F.col('userPositiveHistory')[1]) \
        .withColumn('userRatedMovie3', F.col('userPositiveHistory')[2]) \
        .withColumn('userRatedMovie4', F.col('userPositiveHistory')[3]) \
        .withColumn('userRatedMovie5', F.col('userPositiveHistory')[4]) \
        .withColumn('userRatingCount',
                    F.count(F.lit(1)).over(sql.Window.partitionBy('userId').orderBy('timestamp').rowsBetween(-100, -1))) \
        .withColumn('userAvgReleaseYear', F.avg(F.col('releaseYear')).over(
        sql.Window.partitionBy('userId').orderBy('timestamp').rowsBetween(-100, -1)).cast(IntegerType())) \
        .withColumn('userReleaseYearStddev', F.stddev(F.col("releaseYear")).over(
        sql.Window.partitionBy('userId').orderBy('timestamp').rowsBetween(-100, -1))) \
        .withColumn("userAvgRating", format_number(
        F.avg(F.col("rating")).over(sql.Window.partitionBy('userId').orderBy('timestamp').rowsBetween(-100, -1)),
        NUMBER_PRECISION)) \
        .withColumn("userRatingStddev", F.stddev(F.col("rating")).over(
        sql.Window.partitionBy('userId').orderBy('timestamp').rowsBetween(-100, -1))) \
        .withColumn("userGenres", extractGenresUdf(
        F.collect_list(when(F.col('label') == 1, F.col('genres')).otherwise(F.lit(None))).over(
            sql.Window.partitionBy('userId').orderBy('timestamp').rowsBetween(-100, -1)))) \
        .withColumn("userRatingStddev", format_number(F.col("userRatingStddev"), NUMBER_PRECISION)) \
        .withColumn("userReleaseYearStddev", format_number(F.col("userReleaseYearStddev"), NUMBER_PRECISION)) \
        .withColumn("userGenre1", F.col("userGenres")[0]) \
        .withColumn("userGenre2", F.col("userGenres")[1]) \
        .withColumn("userGenre3", F.col("userGenres")[2]) \
        .withColumn("userGenre4", F.col("userGenres")[3]) \
        .withColumn("userGenre5", F.col("userGenres")[4]) \
        .drop("genres", "userGenres", "userPositiveHistory") \
        .filter(F.col("userRatingCount") > 1)
    samplesWithUserFeatures.printSchema()
    samplesWithUserFeatures.show(10)
    samplesWithUserFeatures.filter(samplesWithMovieFeatures['userId'] == 1).orderBy(F.col('timestamp').asc()).show(
        truncate=False)
    return samplesWithUserFeatures


def splitAndSaveTrainingTestSamples(samplesWithUserFeatures, file_path):
    smallSamples = samplesWithUserFeatures.sample(0.1)
    training, test = smallSamples.randomSplit((0.8, 0.2))
    trainingSavePath = file_path + '/trainingSamples'
    testSavePath = file_path + '/testSamples'
    training.repartition(1).write.option("header", "true").mode('overwrite') \
        .csv(trainingSavePath)
    test.repartition(1).write.option("header", "true").mode('overwrite') \
        .csv(testSavePath)


def splitAndSaveTrainingTestSamplesByTimeStamp(samplesWithUserFeatures, file_path):
    smallSamples = samplesWithUserFeatures.sample(0.1).withColumn("timestampLong", F.col("timestamp").cast(LongType()))
    quantile = smallSamples.stat.approxQuantile("timestampLong", [0.8], 0.05)
    splitTimestamp = quantile[0]
    training = smallSamples.where(F.col("timestampLong") <= splitTimestamp).drop("timestampLong")
    test = smallSamples.where(F.col("timestampLong") > splitTimestamp).drop("timestampLong")
    trainingSavePath = file_path + '/trainingSamples'
    testSavePath = file_path + '/testSamples'
    training.repartition(1).write.option("header", "true").mode('overwrite') \
        .csv(trainingSavePath)
    test.repartition(1).write.option("header", "true").mode('overwrite') \
        .csv(testSavePath)

In [None]:
file_path = '/Users/linchenxiao/Python_Code/Spark/SparrowRecsys'
movieResourcesPath = file_path + "/sampledata/movies.csv"
ratingsResourcesPath = file_path + "/sampledata/ratings.csv"
movieSamples = spark.read.format('csv').option('header', 'true').load(movieResourcesPath)
ratingSamples = spark.read.format('csv').option('header', 'true').load(ratingsResourcesPath)
ratingSamplesWithLabel = addSampleLabel(ratingSamples)
ratingSamplesWithLabel.show(10, truncate=False)
samplesWithMovieFeatures = addMovieFeatures(movieSamples, ratingSamplesWithLabel)
samplesWithUserFeatures = addUserFeatures(samplesWithMovieFeatures)
# save samples as csv format
splitAndSaveTrainingTestSamples(samplesWithUserFeatures, file_path + "/sampledata")
# splitAndSaveTrainingTestSamplesByTimeStamp(samplesWithUserFeatures, file_path + "/webroot/sampledata")

+------+-------+------+----------+
|userId|movieId|rating|timestamp |
+------+-------+------+----------+
|1     |2      |3.5   |1112486027|
|1     |29     |3.5   |1112484676|
|1     |32     |3.5   |1112484819|
|1     |47     |3.5   |1112484727|
|1     |50     |3.5   |1112484580|
+------+-------+------+----------+
only showing top 5 rows

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)

+------+------+--------------------+
|rating| count|          percentage|
+------+------+--------------------+
|   0.5|  9788|0.008375561978987506|
|   1.0| 45018| 0.03852176636392108|
|   1.5| 11794|0.010092090108314123|
|   2.0| 87084| 0.07451751526135553|
|   2.5| 34269|0.029323879593167432|
|   3.0|323616| 0.27691723185451783|
|   3.5| 74376| 0.06364331811904114|
|   4.0|324804|  0.2779337998593234|
|   4.5| 53388| 0.04568395003414231|
|   5.0|204501| 0.17499088682722966|
+------+------+--