In [41]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import pyspark.sql as sql
from pyspark.sql.functions import *
from pyspark.sql.types import *
from collections import defaultdict
from pyspark.sql import functions as F
import tqdm

In [2]:
conf = SparkConf().setAppName('featureEngineering').setMaster('local')

In [3]:
spark = SparkSession.builder.config(conf=conf).getOrCreate()

# Load the data

## from csv

In [4]:
file_path = r"D:\MachineLearningPractice\SparrowRecSys\OriginalCode\src\main\resources" #'file:///home/hadoop/SparrowRecSys/src/main/resources'
movieResourcesPath = file_path + "/webroot/sampledata/movies.csv"
ratingsResourcesPath = file_path + "/webroot/sampledata/ratings.csv"

In [5]:
movieSamples = spark.read.format('csv').option('header', 'true').load(movieResourcesPath)
ratingSamples = spark.read.format('csv').option('header', 'true').load(ratingsResourcesPath)

In [6]:
movieSamples.printSchema()

root
 |-- movieId: string (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



In [7]:
ratingSamples.printSchema()

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)



## from redis

you have to run `../Data2Redis/1-csv2redis.ipynb` first. 

In [4]:
import redis
r = redis.StrictRedis(host='localhost', port=6379) # , db=1, password='123456'

In [5]:
r.get("movie-999")

b"{'title': '2 Days in the Valley (1996)', 'genres': 'Crime|Film-Noir'}"

In [6]:
l = []
for key in r.keys("movie-*"):
    info = eval(r.get(key))
    key_str = key.decode("utf-8")
    l.append(
        (key_str.split("-")[1], 
         str(info["title"]), 
         str(info["genres"]))
    )

In [7]:
movieSamples = spark.createDataFrame(l, ['movieId', 'title', "genres"])
movieSamples.printSchema()
del l

root
 |-- movieId: string (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



In [8]:
movieSamples.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|    687| Country Life (1994)|       Drama|Romance|
|    365|Little Buddha (1993)|               Drama|
|    322|Swimming with Sha...|        Comedy|Drama|
|    676|    They Bite (1996)|Comedy|Horror|Sci-Fi|
|    250|Heavyweights (Hea...|     Children|Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [9]:
l = []
for key in r.keys("rating-user_*-movie_*"):
    info = eval(r.get(key))
    key_str = key.decode("utf-8")
    ## 
    userId = key_str.split("-")[1].split("_")[-1]
    movieId = key_str.split("-")[2].split("_")[-1]
    ## 
    l.append(
        (
            userId, movieId,
            str(info["rating"]), 
            str(info["timestamp"])
        )
    )
## 有点耗时。开了个游戏，建了个档，才好。

In [10]:
ratingSamples = spark.createDataFrame(l, ['userId', 'movieId', 'rating', "timestamp"])
ratingSamples.printSchema()
del l

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [11]:
ratingSamples.show(5)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|   739|    457|   4.0|1214345582|
| 21391|    527|   5.0| 940498637|
|  3092|    383|   4.0| 893063428|
| 18338|    340|   3.0|1131967426|
| 28244|     21|   4.0| 838561558|
+------+-------+------+----------+
only showing top 5 rows



# Add label

In [12]:
NUMBER_PRECISION = 2

In [13]:
def addSampleLabel(ratingSamples):
    ratingSamples.show(5, truncate=False)
    ratingSamples.printSchema()
    sampleCount = ratingSamples.count()
    ratingSamples.groupBy('rating').count().orderBy('rating').withColumn('percentage',
                                                                         F.col('count') / sampleCount).show()
    ##上一行操作，看上去好像没什么卵用啊，就是拿来随便计算一下，得到一个统计发现的。我去。
    ratingSamples = ratingSamples.withColumn('label', when(F.col('rating') >= 3.5, 1).otherwise(0))
    return ratingSamples

In [14]:
ratingSamplesWithLabel = addSampleLabel(ratingSamples)

+------+-------+------+----------+
|userId|movieId|rating|timestamp |
+------+-------+------+----------+
|739   |457    |4.0   |1214345582|
|21391 |527    |5.0   |940498637 |
|3092  |383    |4.0   |893063428 |
|18338 |340    |3.0   |1131967426|
|28244 |21     |4.0   |838561558 |
+------+-------+------+----------+
only showing top 5 rows

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)

+------+------+--------------------+
|rating| count|          percentage|
+------+------+--------------------+
|   0.5|  9788|0.008375561978987506|
|   1.0| 45018| 0.03852176636392108|
|   1.5| 11794|0.010092090108314123|
|   2.0| 87084| 0.07451751526135553|
|   2.5| 34269|0.029323879593167432|
|   3.0|323616| 0.27691723185451783|
|   3.5| 74376| 0.06364331811904114|
|   4.0|324804|  0.2779337998593234|
|   4.5| 53388| 0.04568395003414231|
|   5.0|204501| 0.17499088682722966|
+------+------+--

In [15]:
ratingSamplesWithLabel.show(5)

+------+-------+------+----------+-----+
|userId|movieId|rating| timestamp|label|
+------+-------+------+----------+-----+
|   739|    457|   4.0|1214345582|    1|
| 21391|    527|   5.0| 940498637|    1|
|  3092|    383|   4.0| 893063428|    1|
| 18338|    340|   3.0|1131967426|    0|
| 28244|     21|   4.0| 838561558|    1|
+------+-------+------+----------+-----+
only showing top 5 rows



# Add movie features

`samplesWithMovieFeatures = addMovieFeatures(movieSamples, ratingSamplesWithLabel)`

In [16]:
## add some basic features, like title and genres
samplesWithMovies1 = ratingSamplesWithLabel.join(movieSamples, on=['movieId'], how='left')
samplesWithMovies1.show(10)

+-------+------+------+----------+-----+-------------------+--------------------+
|movieId|userId|rating| timestamp|label|              title|              genres|
+-------+------+------+----------+-----+-------------------+--------------------+
|    296|  2227|   5.0|1347882934|    1|Pulp Fiction (1994)|Comedy|Crime|Dram...|
|    296| 16687|   5.0| 894296292|    1|Pulp Fiction (1994)|Comedy|Crime|Dram...|
|    296| 29161|   5.0|1374916885|    1|Pulp Fiction (1994)|Comedy|Crime|Dram...|
|    296| 28051|   4.5|1117136927|    1|Pulp Fiction (1994)|Comedy|Crime|Dram...|
|    296| 29786|   5.0|1163571473|    1|Pulp Fiction (1994)|Comedy|Crime|Dram...|
|    296|  6110|   3.0| 846667951|    0|Pulp Fiction (1994)|Comedy|Crime|Dram...|
|    296| 15125|   4.0|1237401964|    1|Pulp Fiction (1994)|Comedy|Crime|Dram...|
|    296|  9311|   4.5|1288834720|    1|Pulp Fiction (1994)|Comedy|Crime|Dram...|
|    296|  8251|   1.0| 848175961|    0|Pulp Fiction (1994)|Comedy|Crime|Dram...|
|    296| 24201|

In [17]:
def extractReleaseYearUdf(title):
    # add realease year
    if not title or len(title.strip()) < 6:
        return 1990
    else:
        yearStr = title.strip()[-5:-1]
    return int(yearStr)

samplesWithMovies2 = samplesWithMovies1.withColumn(
    'releaseYear',
    udf(extractReleaseYearUdf, IntegerType())('title') ## 看样子，这些函数必须要包装成udf才能运行在spark df的列上哦。
) \
.withColumn('title', udf(lambda x: x.strip()[:-6].strip(), StringType())('title')) \
.drop('title')

In [18]:
samplesWithMovies2.show(10)

+-------+------+------+----------+-----+--------------------+-----------+
|movieId|userId|rating| timestamp|label|              genres|releaseYear|
+-------+------+------+----------+-----+--------------------+-----------+
|    296|  2227|   5.0|1347882934|    1|Comedy|Crime|Dram...|       1994|
|    296| 16687|   5.0| 894296292|    1|Comedy|Crime|Dram...|       1994|
|    296| 29161|   5.0|1374916885|    1|Comedy|Crime|Dram...|       1994|
|    296| 28051|   4.5|1117136927|    1|Comedy|Crime|Dram...|       1994|
|    296| 29786|   5.0|1163571473|    1|Comedy|Crime|Dram...|       1994|
|    296|  6110|   3.0| 846667951|    0|Comedy|Crime|Dram...|       1994|
|    296| 15125|   4.0|1237401964|    1|Comedy|Crime|Dram...|       1994|
|    296|  9311|   4.5|1288834720|    1|Comedy|Crime|Dram...|       1994|
|    296|  8251|   1.0| 848175961|    0|Comedy|Crime|Dram...|       1994|
|    296| 24201|   5.0| 834501281|    1|Comedy|Crime|Dram...|       1994|
+-------+------+------+----------+----

In [19]:
## split genres. Originally the genres are in one string, here we split them into individual words.
## only select 3 genres into the traing. 就是拿了前三个genre，就酱紫了。
samplesWithMovies3 = samplesWithMovies2.withColumn('movieGenre1', split(F.col('genres'), "\\|")[0]) \
.withColumn('movieGenre2', split(F.col('genres'), "\\|")[1]) \
.withColumn('movieGenre3', split(F.col('genres'), "\\|")[2])

In [20]:
samplesWithMovies3.show(10)

+-------+------+------+----------+-----+--------------------+-----------+-----------+-----------+-----------+
|movieId|userId|rating| timestamp|label|              genres|releaseYear|movieGenre1|movieGenre2|movieGenre3|
+-------+------+------+----------+-----+--------------------+-----------+-----------+-----------+-----------+
|    296|  2227|   5.0|1347882934|    1|Comedy|Crime|Dram...|       1994|     Comedy|      Crime|      Drama|
|    296| 16687|   5.0| 894296292|    1|Comedy|Crime|Dram...|       1994|     Comedy|      Crime|      Drama|
|    296| 29161|   5.0|1374916885|    1|Comedy|Crime|Dram...|       1994|     Comedy|      Crime|      Drama|
|    296| 28051|   4.5|1117136927|    1|Comedy|Crime|Dram...|       1994|     Comedy|      Crime|      Drama|
|    296| 29786|   5.0|1163571473|    1|Comedy|Crime|Dram...|       1994|     Comedy|      Crime|      Drama|
|    296|  6110|   3.0| 846667951|    0|Comedy|Crime|Dram...|       1994|     Comedy|      Crime|      Drama|
|    296| 

In [21]:
# add rating features
movieRatingFeatures = samplesWithMovies3.groupBy('movieId')\
.agg(
    F.count(F.lit(1)).alias('movieRatingCount'), ## 这里的lit（1）是不是指，定义一个常数？所以这行的意思就是，在agg集合里面，每出现一个个体，计数君就增加lit(1)
    format_number(F.avg(F.col('rating')), NUMBER_PRECISION).alias('movieAvgRating'), ## 这行就很缝合怪了。包含了算平均，还有格式化数据的操作。
    F.stddev(F.col('rating')).alias('movieRatingStddev') ## 这里就是计算电影评分的标准差了。很标准的实现方式。 
)\
.fillna(0)\
.withColumn(
    'movieRatingStddev', format_number(F.col('movieRatingStddev'), NUMBER_PRECISION)
) ## withColumn就是在原表格上做操作。而这里的操作总体上是发生在groupby的，这就会生成新的df。 

In [22]:
movieRatingFeatures.show(10)

+-------+----------------+--------------+-----------------+
|movieId|movieRatingCount|movieAvgRating|movieRatingStddev|
+-------+----------------+--------------+-----------------+
|    296|           14616|          4.17|             0.98|
|    467|             174|          3.44|             1.23|
|    675|               6|          2.33|             0.82|
|    691|             254|          3.12|             1.04|
|    829|             402|          2.62|             1.22|
|    125|             788|          3.71|             0.93|
|    451|             159|          3.00|             0.88|
|    800|            1609|          4.04|             0.91|
|    853|              20|          3.50|             1.24|
|    944|             259|          3.83|             0.92|
+-------+----------------+--------------+-----------------+
only showing top 10 rows



In [23]:
# join movie rating features
samplesWithMovies4 = samplesWithMovies3.join(movieRatingFeatures, on=['movieId'], how='left')
samplesWithMovies4.printSchema()
samplesWithMovies4.show(5, truncate=False)

root
 |-- movieId: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- label: integer (nullable = false)
 |-- genres: string (nullable = true)
 |-- releaseYear: integer (nullable = true)
 |-- movieGenre1: string (nullable = true)
 |-- movieGenre2: string (nullable = true)
 |-- movieGenre3: string (nullable = true)
 |-- movieRatingCount: long (nullable = true)
 |-- movieAvgRating: string (nullable = true)
 |-- movieRatingStddev: string (nullable = true)

+-------+------+------+----------+-----+---------------------------+-----------+-----------+-----------+-----------+----------------+--------------+-----------------+
|movieId|userId|rating|timestamp |label|genres                     |releaseYear|movieGenre1|movieGenre2|movieGenre3|movieRatingCount|movieAvgRating|movieRatingStddev|
+-------+------+------+----------+-----+---------------------------+-----------+-----------+-----------+---------

In [24]:
samplesWithMovieFeatures = samplesWithMovies4

## Save the movie features into Redis

In [61]:
def dealWithEmpty(string):
    if string == None:
        return ""
    else:
        return string
    
for row in tqdm.tqdm(samplesWithMovieFeatures.rdd.collect()):
#     print(row)
#     print(row["movieId"], row["userId"], row["rating"], row["timestamp"], row["label"], 
#     row["genres"], row["releaseYear"], row["movieGenre1"], row["movieGenre2"], row["movieGenre3"], 
#     row["movieRatingCount"], row["movieAvgRating"], row["movieRatingStddev"], )
    try: 
        if not r.exists("mf-{}".format(row["movieId"])):
            r.hset("mf-{}".format(row["movieId"]), "genres", dealWithEmpty(row["genres"]))
            r.hset("mf-{}".format(row["movieId"]), "timestamp", dealWithEmpty(row["timestamp"]))
            r.hset("mf-{}".format(row["movieId"]), "releaseYear", dealWithEmpty(row["releaseYear"]))
            r.hset("mf-{}".format(row["movieId"]), "movieGenre1", dealWithEmpty(row["movieGenre1"]))
            r.hset("mf-{}".format(row["movieId"]), "movieGenre2", dealWithEmpty(row["movieGenre2"]))
            r.hset("mf-{}".format(row["movieId"]), "movieGenre3", dealWithEmpty(row["movieGenre3"]))
            r.hset("mf-{}".format(row["movieId"]), "movieRatingCount", dealWithEmpty(row["movieRatingCount"]))
            r.hset("mf-{}".format(row["movieId"]), "movieAvgRating", dealWithEmpty(row["movieAvgRating"]))
            r.hset("mf-{}".format(row["movieId"]), "movieRatingStddev", dealWithEmpty(row["movieRatingStddev"]))
        else:
            pass
    except:
        print("mf-{} failed, there may be some empty ".format(row["movieId"]))
#         break
#     break
        
    
#     vector = [float(_) for _ in row["vector"]] ## 这里要转化为python基本形式, 如果不转的话, 就是numpy.float64, 这种格式后面是会遇上问题的. 
#     Vectors_list.append((movie_id, vector))

100%|██████████████████████████████████████████████████████████████████████| 1168638/1168638 [04:04<00:00, 4788.99it/s]


In [54]:
# r.keys("mf-*")

[]

In [59]:
# r.hget("mf-691", "rating")

In [52]:
# r.delete("mf-296")

1

In [60]:
# for key in r.keys("mf-*"):
#     r.delete(key)

# Add user features

`samplesWithUserFeatures = addUserFeatures(samplesWithMovieFeatures)`

In [25]:
def extractGenres(genres_list):
    '''
    pass in a list which format like ["Action|Adventure|Sci-Fi|Thriller", "Crime|Horror|Thriller"]
    count by each genre，return genre_list in reverse order
    eg:
    (('Thriller',2),('Action',1),('Sci-Fi',1),('Horror', 1), ('Adventure',1),('Crime',1))
    return:['Thriller','Action','Sci-Fi','Horror','Adventure','Crime']
    '''
    genres_dict = defaultdict(int)
    for genres in genres_list:
        for genre in genres.split('|'):
            genres_dict[genre] += 1
    sortedGenres = sorted(genres_dict.items(), key=lambda x: x[1], reverse=True)
    return [x[0] for x in sortedGenres]

In [26]:
extractGenresUdf = udf(extractGenres, ArrayType(StringType()))

In [27]:
## 这一部操作：groupby 用户，针对一个用户，他在看某一部电影之前看过的所有电影，有哪些打过好评的。
samplesWithUserFeatures1 = samplesWithMovieFeatures \
.withColumn('userPositiveHistory',
            F.collect_list(
                when(F.col('label') == 1, F.col('movieId')).otherwise(F.lit(None))
            ).over(sql.Window.partitionBy("userId").orderBy(F.col("timestamp")).rowsBetween(-100, -1))
) 

In [28]:
samplesWithUserFeatures1.show(10)

+-------+------+------+---------+-----+--------------------+-----------+-----------+-----------+-----------+----------------+--------------+-----------------+-------------------+
|movieId|userId|rating|timestamp|label|              genres|releaseYear|movieGenre1|movieGenre2|movieGenre3|movieRatingCount|movieAvgRating|movieRatingStddev|userPositiveHistory|
+-------+------+------+---------+-----+--------------------+-----------+-----------+-----------+-----------+----------------+--------------+-----------------+-------------------+
|    858| 10096|   4.0|954364961|    1|         Crime|Drama|       1972|      Crime|      Drama|       null|            8996|          4.35|             0.86|                 []|
|    678| 10096|   3.0|954365396|    0|      Drama|Thriller|       1993|      Drama|   Thriller|       null|             620|          4.10|             0.86|              [858]|
|    514| 10096|   3.0|954365410|    0|              Comedy|       1994|     Comedy|       null|       nu

In [29]:
## 抽样一个用户出来看看
for row in samplesWithUserFeatures1.where(F.col("userId") == 10096).rdd.collect():
    print(row["movieId"], row["label"], row["timestamp"], row["userPositiveHistory"])

858 1 954364961 []
678 0 954365396 ['858']
514 0 954365410 ['858']
608 0 954365515 ['858']
50 1 954365515 ['858']
593 1 954365552 ['858', '50']
25 0 954365571 ['858', '50', '593']
457 1 954365571 ['858', '50', '593']
541 0 954365664 ['858', '50', '593', '457']


In [30]:
# samplesWithUserFeatures1.write.option("header", "true").mode("overwrite").json("nimabi.json")

In [31]:
# samplesWithUserFeatures = samplesWithMovieFeatures \
# .withColumn('userPositiveHistory',
#             F.collect_list(
#                 when(F.col('label') == 1, F.col('movieId')).otherwise(F.lit(None))
#             ).over(
#                 sql.Window.partitionBy("userId").orderBy(F.col("timestamp")).rowsBetween(-100, -1)
#             )
# )\
# .withColumn("userPositiveHistory", reverse(F.col("userPositiveHistory")))
# samplesWithMovieFeatures.show()

In [32]:
samplesWithUserFeatures2 = samplesWithUserFeatures1.withColumn("userPositiveHistory", reverse(F.col("userPositiveHistory")))
samplesWithUserFeatures2.show()

+-------+------+------+---------+-----+--------------------+-----------+-----------+-----------+-----------+----------------+--------------+-----------------+--------------------+
|movieId|userId|rating|timestamp|label|              genres|releaseYear|movieGenre1|movieGenre2|movieGenre3|movieRatingCount|movieAvgRating|movieRatingStddev| userPositiveHistory|
+-------+------+------+---------+-----+--------------------+-----------+-----------+-----------+-----------+----------------+--------------+-----------------+--------------------+
|    858| 10096|   4.0|954364961|    1|         Crime|Drama|       1972|      Crime|      Drama|       null|            8996|          4.35|             0.86|                  []|
|    678| 10096|   3.0|954365396|    0|      Drama|Thriller|       1993|      Drama|   Thriller|       null|             620|          4.10|             0.86|               [858]|
|    514| 10096|   3.0|954365410|    0|              Comedy|       1994|     Comedy|       null|    

In [33]:
## 抽样一个用户出来看看
for row in samplesWithUserFeatures2.where(F.col("userId") == 10096).rdd.collect():
    print(row["movieId"], row["label"], row["timestamp"], row["userPositiveHistory"], 
         # row["userRatedMovie1"], row["userRatedMovie2"], row["userRatedMovie3"], row["userRatedMovie4"], row["userRatedMovie5"]
     )

## 看见没，下面的userPositiveHistory，里面的电影名称都按照升序排列了。看看之前的例子，是不是发现，是按照降序排列的？
## 所以啊，下面那个cell里面的好评电影列，本质上来说，就是一个用户在看这部电影之前，好评过的电影，按照电影id的升序顺序，取前几个罢了。
## 经过解构，是不是发现不那么复杂了？终于突破了。

858 1 954364961 []
678 0 954365396 ['858']
514 0 954365410 ['858']
608 0 954365515 ['858']
50 1 954365515 ['858']
593 1 954365552 ['50', '858']
25 0 954365571 ['593', '50', '858']
457 1 954365571 ['593', '50', '858']
541 0 954365664 ['457', '593', '50', '858']


In [34]:
samplesWithUserFeatures3 = samplesWithUserFeatures2\
.withColumn('userRatedMovie1', F.col('userPositiveHistory')[0]) \
.withColumn('userRatedMovie2', F.col('userPositiveHistory')[1]) \
.withColumn('userRatedMovie3', F.col('userPositiveHistory')[2]) \
.withColumn('userRatedMovie4', F.col('userPositiveHistory')[3]) \
.withColumn('userRatedMovie5', F.col('userPositiveHistory')[4])

In [35]:
samplesWithUserFeatures4 = samplesWithUserFeatures3\
.withColumn(
    'userRatingCount',## 计算这个用户在看这部电影之前给过多少评分
    F.count(F.lit(1)).over(sql.Window.partitionBy('userId').orderBy('timestamp').rowsBetween(-100, -1))
)\
.withColumn(
    'userAvgReleaseYear', ## 计算这个用户在看这部电影之前看过的电影的平均年份。
## 举个例子吧，比如up主，老电影故事，他的这个值可能就是1970；专门讲童年神剧的大头虫，这个值可能就是2000；老邪估计讲的都是最新的电影，所以他的这个值就是2010酱紫。
## 通过举例了三个up主，是不是觉得这个特征还是很有意义的呢2333？
    F.avg(F.col('releaseYear')).over(sql.Window.partitionBy('userId').orderBy('timestamp').rowsBetween(-100, -1)).cast(IntegerType())
)\
.withColumn(
    'userReleaseYearStddev', 
    F.stddev(F.col("releaseYear")).over(sql.Window.partitionBy('userId').orderBy('timestamp').rowsBetween(-100, -1))
)\
.withColumn(
    "userAvgRating", 
    format_number(
        F.avg(F.col("rating")).over(sql.Window.partitionBy('userId').orderBy('timestamp').rowsBetween(-100, -1)), 
        NUMBER_PRECISION
    )
)\
.withColumn(
    "userRatingStddev", 
    F.stddev(F.col("rating")).over(sql.Window.partitionBy('userId').orderBy('timestamp').rowsBetween(-100, -1))
)\
.withColumn(
    "userGenres", ## 把用户以前好评过的电影的genres全部搜集起来。
                ## 原始的genres是一个"genre1|genre2|genre3"这样的，这里呢，我们把里面的genre拆出来成为一个一个genre1，genre2，genre3酱紫，
                ## userGenres: 把用户以前看过的电影的风格, 按照它们出现的次数进行排序. 
    extractGenresUdf(F.collect_list(when(F.col('label') == 1, F.col('genres')).otherwise(F.lit(None))).over(sql.Window.partitionBy('userId').orderBy('timestamp').rowsBetween(-100, -1)))
)

`samplesWithUserFeatures4`里的列，有一个共同点，就是他们都利用了滑动窗口，专门针对用户以前看过的电影进行统计。

In [36]:
samplesWithUserFeatures5 = samplesWithUserFeatures4\
.withColumn("userRatingStddev", format_number(F.col("userRatingStddev"), NUMBER_PRECISION)) \
.withColumn("userReleaseYearStddev", format_number(F.col("userReleaseYearStddev"), NUMBER_PRECISION)) \
## 这里就是调整一下某一些列的精确度。

In [37]:
samplesWithUserFeatures6 = samplesWithUserFeatures5\
.withColumn("userGenre1", F.col("userGenres")[0]) \
.withColumn("userGenre2", F.col("userGenres")[1]) \
.withColumn("userGenre3", F.col("userGenres")[2]) \
.withColumn("userGenre4", F.col("userGenres")[3]) \
.withColumn("userGenre5", F.col("userGenres")[4])
## 这里也很明确了，就是抽取userGenres的前5个值。
## 这些列反映的是，用户以前看过的电影里，最多的类别是哪几个。比如，用户过去看的最多的是userGenre1类型的电影，第二多的是userGenre2的电影。

In [38]:
samplesWithUserFeatures = samplesWithUserFeatures6\
.drop("genres", "userGenres", "userPositiveHistory") \
.filter(F.col("userRatingCount") > 1) ## 只要评过至少2部电影的用户的数据。
## 假如有的作者只看了1部电影，那么他的那些跟“以前看过的电影”相关的列全都会是空的。这样也就太多空值了，这样的行也没什么意义了。

In [39]:
samplesWithUserFeatures.printSchema()

root
 |-- movieId: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- label: integer (nullable = false)
 |-- releaseYear: integer (nullable = true)
 |-- movieGenre1: string (nullable = true)
 |-- movieGenre2: string (nullable = true)
 |-- movieGenre3: string (nullable = true)
 |-- movieRatingCount: long (nullable = true)
 |-- movieAvgRating: string (nullable = true)
 |-- movieRatingStddev: string (nullable = true)
 |-- userRatedMovie1: string (nullable = true)
 |-- userRatedMovie2: string (nullable = true)
 |-- userRatedMovie3: string (nullable = true)
 |-- userRatedMovie4: string (nullable = true)
 |-- userRatedMovie5: string (nullable = true)
 |-- userRatingCount: long (nullable = false)
 |-- userAvgReleaseYear: integer (nullable = true)
 |-- userReleaseYearStddev: string (nullable = true)
 |-- userAvgRating: string (nullable = true)
 |-- userRatingStddev: string (nullable = true)
 |-- use

In [None]:
# samplesWithUserFeatures.show(10)

## 这个不错啊，用来筛数据很合适。
# samplesWithUserFeatures.filter(samplesWithMovieFeatures['userId'] == 1).orderBy(F.col('timestamp').asc()).show(truncate=False)

# Save the user features into Redis

# Split the training and testing data

In [None]:
file_path

smallSamples = samplesWithUserFeatures.sample(0.1).withColumn("timestampLong", F.col("timestamp").cast(LongType()))
quantile = smallSamples.stat.approxQuantile("timestampLong", [0.8], 0.05)
splitTimestamp = quantile[0]
training = smallSamples.where(F.col("timestampLong") <= splitTimestamp).drop("timestampLong")
test = smallSamples.where(F.col("timestampLong") > splitTimestamp).drop("timestampLong")
trainingSavePath = file_path + '/webroot/sampledata/trainingSamples'
testSavePath = file_path + '/webroot/sampledata/testSamples'
training.repartition(1).write.option("header", "true").mode('overwrite') \
    .csv(trainingSavePath)
test.repartition(1).write.option("header", "true").mode('overwrite') \
    .csv(testSavePath)