In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import pyspark.sql as sql
from pyspark.sql.functions import *
from pyspark.sql.types import *
from collections import defaultdict
from pyspark.sql import functions as F

NUMBER_PRECISION = 2

In [2]:
import os
import sys
from pyspark.sql import SparkSession

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [3]:
conf = SparkConf().setAppName('featureEngineering').setMaster('local')
spark = SparkSession.builder.config(conf=conf).getOrCreate()

In [4]:
file_path = r'C:\Users\nxf42771\Documents\GitHub\SparrowRecSys\src\main/resources'
movieResourcesPath = file_path + "/webroot/sampledata/movies.csv"
ratingsResourcesPath = file_path + "/webroot/sampledata/ratings.csv"
movieSamples = spark.read.format('csv').option('header', 'true').load(movieResourcesPath)
ratingSamples = spark.read.format('csv').option('header', 'true').load(ratingsResourcesPath)

In [5]:
ratingSamples.show(5, truncate=False)
ratingSamples.printSchema()
sampleCount = ratingSamples.count()

+------+-------+------+----------+
|userId|movieId|rating|timestamp |
+------+-------+------+----------+
|1     |2      |3.5   |1112486027|
|1     |29     |3.5   |1112484676|
|1     |32     |3.5   |1112484819|
|1     |47     |3.5   |1112484727|
|1     |50     |3.5   |1112484580|
+------+-------+------+----------+
only showing top 5 rows

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [6]:
sampleCount

1168638

In [7]:
ratingSamples.groupBy('rating').count().orderBy('rating').withColumn('percentage',
                                                                     F.col('count') / sampleCount).show()

+------+------+--------------------+
|rating| count|          percentage|
+------+------+--------------------+
|   0.5|  9788|0.008375561978987506|
|   1.0| 45018| 0.03852176636392108|
|   1.5| 11794|0.010092090108314123|
|   2.0| 87084| 0.07451751526135553|
|   2.5| 34269|0.029323879593167432|
|   3.0|323616| 0.27691723185451783|
|   3.5| 74376| 0.06364331811904114|
|   4.0|324804|  0.2779337998593234|
|   4.5| 53388| 0.04568395003414231|
|   5.0|204501| 0.17499088682722966|
+------+------+--------------------+



In [8]:
ratingSamplesWithLabel = ratingSamples.withColumn('label', when(F.col('rating') >= 3.5, 1).otherwise(0))


In [9]:
movieSamples.show(10, truncate=False)

+-------+----------------------------------+-------------------------------------------+
|movieId|title                             |genres                                     |
+-------+----------------------------------+-------------------------------------------+
|1      |Toy Story (1995)                  |Adventure|Animation|Children|Comedy|Fantasy|
|2      |Jumanji (1995)                    |Adventure|Children|Fantasy                 |
|3      |Grumpier Old Men (1995)           |Comedy|Romance                             |
|4      |Waiting to Exhale (1995)          |Comedy|Drama|Romance                       |
|5      |Father of the Bride Part II (1995)|Comedy                                     |
|6      |Heat (1995)                       |Action|Crime|Thriller                      |
|7      |Sabrina (1995)                    |Comedy|Romance                             |
|8      |Tom and Huck (1995)               |Adventure|Children                         |
|9      |Sudden Death

In [10]:
ratingSamplesWithLabel.show(10)

+------+-------+------+----------+-----+
|userId|movieId|rating| timestamp|label|
+------+-------+------+----------+-----+
|     1|      2|   3.5|1112486027|    1|
|     1|     29|   3.5|1112484676|    1|
|     1|     32|   3.5|1112484819|    1|
|     1|     47|   3.5|1112484727|    1|
|     1|     50|   3.5|1112484580|    1|
|     1|    112|   3.5|1094785740|    1|
|     1|    151|   4.0|1094785734|    1|
|     1|    223|   4.0|1112485573|    1|
|     1|    253|   4.0|1112484940|    1|
|     1|    260|   4.0|1112484826|    1|
+------+-------+------+----------+-----+
only showing top 10 rows



In [11]:
# add movie basic features
samplesWithMovies1 = ratingSamplesWithLabel.join(movieSamples, on=['movieId'], how='left')
samplesWithMovies1.show(10)

+-------+------+------+----------+-----+--------------------+--------------------+
|movieId|userId|rating| timestamp|label|               title|              genres|
+-------+------+------+----------+-----+--------------------+--------------------+
|      2|     1|   3.5|1112486027|    1|      Jumanji (1995)|Adventure|Childre...|
|     29|     1|   3.5|1112484676|    1|City of Lost Chil...|Adventure|Drama|F...|
|     32|     1|   3.5|1112484819|    1|Twelve Monkeys (a...|Mystery|Sci-Fi|Th...|
|     47|     1|   3.5|1112484727|    1|Seven (a.k.a. Se7...|    Mystery|Thriller|
|     50|     1|   3.5|1112484580|    1|Usual Suspects, T...|Crime|Mystery|Thr...|
|    112|     1|   3.5|1094785740|    1|Rumble in the Bro...|Action|Adventure|...|
|    151|     1|   4.0|1094785734|    1|      Rob Roy (1995)|Action|Drama|Roma...|
|    223|     1|   4.0|1112485573|    1|       Clerks (1994)|              Comedy|
|    253|     1|   4.0|1112484940|    1|Interview with th...|        Drama|Horror|
|   

In [12]:
def extractReleaseYearUdf(title):
    # add realease year
    if not title or len(title.strip()) < 6:
        return 1990
    else:
        yearStr = title.strip()[-5:-1]
    return int(yearStr)


# add releaseYear,title
samplesWithMovies2 = samplesWithMovies1.withColumn('releaseYear',
                                                   udf(extractReleaseYearUdf, IntegerType())('title')) \
    .withColumn('title', udf(lambda x: x.strip()[:-6].strip(), StringType())('title')) \
    .drop('title')

In [13]:
# split genres
samplesWithMovies3 = samplesWithMovies2.withColumn('movieGenre1', split(F.col('genres'), "\\|")[0]) \
    .withColumn('movieGenre2', split(F.col('genres'), "\\|")[1]) \
    .withColumn('movieGenre3', split(F.col('genres'), "\\|")[2])

In [14]:
# add rating features
movieRatingFeatures = samplesWithMovies3.groupBy('movieId').agg(F.count(F.lit(1)).alias('movieRatingCount'),
                                                                format_number(F.avg(F.col('rating')),
                                                                              NUMBER_PRECISION).alias(
                                                                    'movieAvgRating'),
                                                                F.stddev(F.col('rating')).alias(
                                                                    'movieRatingStddev')).fillna(0) \
    .withColumn('movieRatingStddev', format_number(F.col('movieRatingStddev'), NUMBER_PRECISION))

In [15]:
# join movie rating features
samplesWithMovies4 = samplesWithMovies3.join(movieRatingFeatures, on=['movieId'], how='left')
samplesWithMovies4.printSchema()
samplesWithMovies4

root
 |-- movieId: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- label: integer (nullable = false)
 |-- genres: string (nullable = true)
 |-- releaseYear: integer (nullable = true)
 |-- movieGenre1: string (nullable = true)
 |-- movieGenre2: string (nullable = true)
 |-- movieGenre3: string (nullable = true)
 |-- movieRatingCount: long (nullable = true)
 |-- movieAvgRating: string (nullable = true)
 |-- movieRatingStddev: string (nullable = true)



DataFrame[movieId: string, userId: string, rating: string, timestamp: string, label: int, genres: string, releaseYear: int, movieGenre1: string, movieGenre2: string, movieGenre3: string, movieRatingCount: bigint, movieAvgRating: string, movieRatingStddev: string]

In [16]:
samplesWithMovies4.show(5,False)

+-------+------+------+----------+-----+---------------------------+-----------+-----------+-----------+-----------+----------------+--------------+-----------------+
|movieId|userId|rating|timestamp |label|genres                     |releaseYear|movieGenre1|movieGenre2|movieGenre3|movieRatingCount|movieAvgRating|movieRatingStddev|
+-------+------+------+----------+-----+---------------------------+-----------+-----------+-----------+-----------+----------------+--------------+-----------------+
|296    |1     |4.0   |1112484767|1    |Comedy|Crime|Drama|Thriller|1994       |Comedy     |Crime      |Drama      |14616           |4.17          |0.98             |
|296    |8     |5.0   |833973081 |1    |Comedy|Crime|Drama|Thriller|1994       |Comedy     |Crime      |Drama      |14616           |4.17          |0.98             |
|296    |11    |3.5   |1230858799|1    |Comedy|Crime|Drama|Thriller|1994       |Comedy     |Crime      |Drama      |14616           |4.17          |0.98             

In [17]:
samplesWithMovieFeatures = samplesWithMovies4
samplesWithMovieFeatures.printSchema()

root
 |-- movieId: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- label: integer (nullable = false)
 |-- genres: string (nullable = true)
 |-- releaseYear: integer (nullable = true)
 |-- movieGenre1: string (nullable = true)
 |-- movieGenre2: string (nullable = true)
 |-- movieGenre3: string (nullable = true)
 |-- movieRatingCount: long (nullable = true)
 |-- movieAvgRating: string (nullable = true)
 |-- movieRatingStddev: string (nullable = true)



In [18]:
def extractGenres(genres_list):
    '''
    pass in a list which format like ["Action|Adventure|Sci-Fi|Thriller", "Crime|Horror|Thriller"]
    count by each genre，return genre_list in reverse order
    eg:
    (('Thriller',2),('Action',1),('Sci-Fi',1),('Horror', 1), ('Adventure',1),('Crime',1))
    return:['Thriller','Action','Sci-Fi','Horror','Adventure','Crime']
    '''
    genres_dict = defaultdict(int)
    for genres in genres_list:
        for genre in genres.split('|'):
            genres_dict[genre] += 1
    sortedGenres = sorted(genres_dict.items(), key=lambda x: x[1], reverse=True)
    return [x[0] for x in sortedGenres]

def addUserFeatures(samplesWithMovieFeatures):
    extractGenresUdf = udf(extractGenres, ArrayType(StringType()))
    samplesWithUserFeatures = samplesWithMovieFeatures \
        .withColumn('userPositiveHistory',
                    F.collect_list(when(F.col('label') == 1, F.col('movieId')).otherwise(F.lit(None))).over(
                        sql.Window.partitionBy("userId").orderBy(F.col("timestamp")).rowsBetween(-100, -1))) \
        .withColumn("userPositiveHistory", reverse(F.col("userPositiveHistory"))) \
        .withColumn('userRatedMovie1', F.col('userPositiveHistory')[0]) \
        .withColumn('userRatedMovie2', F.col('userPositiveHistory')[1]) \
        .withColumn('userRatedMovie3', F.col('userPositiveHistory')[2]) \
        .withColumn('userRatedMovie4', F.col('userPositiveHistory')[3]) \
        .withColumn('userRatedMovie5', F.col('userPositiveHistory')[4]) \
        .withColumn('userRatingCount',
                    F.count(F.lit(1)).over(sql.Window.partitionBy('userId').orderBy('timestamp').rowsBetween(-100, -1))) \
        .withColumn('userAvgReleaseYear', F.avg(F.col('releaseYear')).over(
        sql.Window.partitionBy('userId').orderBy('timestamp').rowsBetween(-100, -1)).cast(IntegerType())) \
        .withColumn('userReleaseYearStddev', F.stddev(F.col("releaseYear")).over(
        sql.Window.partitionBy('userId').orderBy('timestamp').rowsBetween(-100, -1))) \
        .withColumn("userAvgRating", format_number(
        F.avg(F.col("rating")).over(sql.Window.partitionBy('userId').orderBy('timestamp').rowsBetween(-100, -1)),
        NUMBER_PRECISION)) \
        .withColumn("userRatingStddev", F.stddev(F.col("rating")).over(
        sql.Window.partitionBy('userId').orderBy('timestamp').rowsBetween(-100, -1))) \
        .withColumn("userGenres", extractGenresUdf(
        F.collect_list(when(F.col('label') == 1, F.col('genres')).otherwise(F.lit(None))).over(
            sql.Window.partitionBy('userId').orderBy('timestamp').rowsBetween(-100, -1)))) \
        .withColumn("userRatingStddev", format_number(F.col("userRatingStddev"), NUMBER_PRECISION)) \
        .withColumn("userReleaseYearStddev", format_number(F.col("userReleaseYearStddev"), NUMBER_PRECISION)) \
        .withColumn("userGenre1", F.col("userGenres")[0]) \
        .withColumn("userGenre2", F.col("userGenres")[1]) \
        .withColumn("userGenre3", F.col("userGenres")[2]) \
        .withColumn("userGenre4", F.col("userGenres")[3]) \
        .withColumn("userGenre5", F.col("userGenres")[4]) \
        .drop("genres", "userGenres", "userPositiveHistory") \
        .filter(F.col("userRatingCount") > 1)
    samplesWithUserFeatures.printSchema()
    samplesWithUserFeatures.show(10)
    samplesWithUserFeatures.filter(samplesWithMovieFeatures['userId'] == 1).orderBy(F.col('timestamp').asc()).show(
        truncate=False)
    return samplesWithUserFeatures

In [19]:
samplesWithUserFeatures = addUserFeatures(samplesWithMovieFeatures)
samplesWithUserFeatures.printSchema()

root
 |-- movieId: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- label: integer (nullable = false)
 |-- releaseYear: integer (nullable = true)
 |-- movieGenre1: string (nullable = true)
 |-- movieGenre2: string (nullable = true)
 |-- movieGenre3: string (nullable = true)
 |-- movieRatingCount: long (nullable = true)
 |-- movieAvgRating: string (nullable = true)
 |-- movieRatingStddev: string (nullable = true)
 |-- userRatedMovie1: string (nullable = true)
 |-- userRatedMovie2: string (nullable = true)
 |-- userRatedMovie3: string (nullable = true)
 |-- userRatedMovie4: string (nullable = true)
 |-- userRatedMovie5: string (nullable = true)
 |-- userRatingCount: long (nullable = false)
 |-- userAvgReleaseYear: integer (nullable = true)
 |-- userReleaseYearStddev: string (nullable = true)
 |-- userAvgRating: string (nullable = true)
 |-- userRatingStddev: string (nullable = true)
 |-- use

In [20]:
def splitAndSaveTrainingTestSamples(samplesWithUserFeatures, file_path):
    smallSamples = samplesWithUserFeatures.sample(0.1)
    training, test = smallSamples.randomSplit((0.8, 0.2))
    trainingSavePath = file_path + '/trainingSamples'
    testSavePath = file_path + '/testSamples'
    training.repartition(1).write.option("header", "true").mode('overwrite') \
        .csv(trainingSavePath)
    test.repartition(1).write.option("header", "true").mode('overwrite') \
        .csv(testSavePath)

In [21]:
# save samples as csv format
import os
# os.environ["HADOOP_HOME"] = r"C:\winutil"
splitAndSaveTrainingTestSamples(samplesWithUserFeatures, file_path + "/webroot/sampledata")

Py4JJavaError: An error occurred while calling o318.csv.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:231)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:188)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:979)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:645)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1230)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1435)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:493)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1868)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1910)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:678)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1868)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1910)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:332)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:402)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:375)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:182)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:220)
	... 33 more


In [None]:
import os
os.environ