In [1]:
import findspark
findspark.init()
from pyspark.sql  import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator#to measure the performance of the  ALS model 
from pyspark.ml.tuning  import TrainValidationSplit , ParamGridBuilder#allow us to cross validate and fine-tune  the hyper parameter of the model
from pyspark.ml.recommendation import ALS
#create session
appName = "Recommender system in Spark"
spark = SparkSession.builder.appName(appName).config("spark.some.config.option", "some-value").getOrCreate()

# Read file into dataFrame

In [2]:
#read file into dataFrame using automatically inferred schema
ratings = spark.read.csv('Path/ratings.csv', inferSchema=True, header=True ,nanValue="NA")
movies = spark.read.csv('Path/movies.csv', inferSchema=True, header=True)

In [3]:
ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [4]:
movies.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



In [5]:
ratings.show(2)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|     31|   2.5|1260759144|
|     1|   1029|   3.0|1260759179|
+------+-------+------+----------+
only showing top 2 rows



In [6]:
movies.show(2)

+-------+----------------+--------------------+
|movieId|           title|              genres|
+-------+----------------+--------------------+
|      1|Toy Story (1995)|Adventure|Animati...|
|      2|  Jumanji (1995)|Adventure|Childre...|
+-------+----------------+--------------------+
only showing top 2 rows



In [7]:
#merge "movies" and "ratings" dataFrame based on "movieId"
df=ratings.join(movies, "movieId")


In [8]:
df.show(2)

+-------+------+------+----------+--------------------+--------------------+
|movieId|userId|rating| timestamp|               title|              genres|
+-------+------+------+----------+--------------------+--------------------+
|     31|     1|   2.5|1260759144|Dangerous Minds (...|               Drama|
|   1029|     1|   3.0|1260759179|        Dumbo (1941)|Animation|Childre...|
+-------+------+------+----------+--------------------+--------------------+
only showing top 2 rows



# Data preparation

In [9]:
#use only column data of "userId", "movieId", dan "rating"
data = df.select("userId", "movieId", "rating")
#divide data, 70% for training and 30% for testing
splits = data.randomSplit([0.7, 0.3])
train = splits[0].withColumnRenamed("rating", "label")
test = splits[1].withColumnRenamed("rating", "trueLabel")
#calculate number of rows
train_rows = train.count()
test_rows = test.count()
print ("number of training data rows:", train_rows, 
       ", number of testing data rows:", test_rows)

number of training data rows: 70047 , number of testing data rows: 29957


In [10]:
#define ALS (Alternating Least Square) as our recommender system
als = ALS(maxIter=19, regParam=0.01,rank=20, userCol="userId", 
          itemCol="movieId", ratingCol="label")
#train our ALS model
model = als.fit(train)
print("Training is done!")

Training is done!


# Predict testing data

In [11]:
prediction = model.transform(test)
print("testing is done!")

testing is done!


In [12]:
prediction.join(movies, "movieId").select(
    "userId", "title", "prediction", "trueLabel").show(n=5, truncate=False)

+------+---------------------------+----------+---------+
|userId|title                      |prediction|trueLabel|
+------+---------------------------+----------+---------+
|232   |Guilty as Sin (1993)       |1.7766558 |4.0      |
|285   |Hudsucker Proxy, The (1994)|4.5955706 |5.0      |
|491   |Hudsucker Proxy, The (1994)|4.1515136 |3.0      |
|299   |Hudsucker Proxy, The (1994)|3.1894815 |4.5      |
|309   |Hudsucker Proxy, The (1994)|3.4492915 |4.0      |
+------+---------------------------+----------+---------+
only showing top 5 rows



In [13]:
userRecommendets=model.recommendForAllUsers(5)
moviereommends=model.recommendForAllItems(5)

In [14]:
userRecommendets.printSchema()

root
 |-- userId: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- movieId: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)



In [15]:
userRecommendets.select("userId" , "recommendations.movieId").show(5 , False)

+------+------------------------------+
|userId|movieId                       |
+------+------------------------------+
|471   |[1997, 2712, 3578, 1259, 3039]|
|463   |[1272, 260, 4128, 2973, 59315]|
|496   |[1304, 4993, 5952, 3095, 260] |
|148   |[953, 1233, 17, 1077, 151]    |
|540   |[8376, 1883, 5995, 8665, 1088]|
+------+------------------------------+
only showing top 5 rows



In [16]:
moviereommends.printSchema()

root
 |-- movieId: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- userId: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)



In [17]:
moviereommends.select("movieId" , "recommendations.userId").show(5 , False)

+-------+-------------------------+
|movieId|userId                   |
+-------+-------------------------+
|1580   |[145, 134, 46, 136, 202] |
|5300   |[257, 604, 151, 657, 348]|
|6620   |[6, 614, 59, 304, 545]   |
|7340   |[621, 448, 155, 535, 232]|
|32460  |[298, 365, 197, 53, 568] |
+-------+-------------------------+
only showing top 5 rows



# Evaluate the accuracy of our model

In [18]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(
    labelCol="trueLabel", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(prediction)
print ("Root Mean Square Error (RMSE):", rmse)

Root Mean Square Error (RMSE): nan


In [19]:
prediction.count()
a = prediction.count()
print("number of original data rows: ", a)
#drop rows with any missing data
cleanPred = prediction.dropna(how="any", subset=["prediction"])
b = cleanPred.count()
print("number of rows after dropping data with missing value: ", b)
print("number of missing data: ", a-b)

number of original data rows:  29957
number of rows after dropping data with missing value:  28769
number of missing data:  1188


In [20]:
rmse = evaluator.evaluate(cleanPred)
print ("Root Mean Square Error (RMSE):", rmse)

Root Mean Square Error (RMSE): 1.3414486717928011


In [21]:
from pyspark.ml.tuning import ParamGridBuilder

In [22]:
param_grid = ParamGridBuilder().addGrid(als.rank, [5, 40, 80, 120]).addGrid(als.maxIter, [5, 100, 250, 500]).addGrid(als.regParam, [.05, .1, 1.5]).build()

# Sentiment Analysis

In [23]:
#import modules
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer, StopWordsRemover

In [24]:
#Read data file into Spark dataFrame
#read csv file into dataFrame with automatically inferred schema
tweets_csv = spark.read.csv('Path/tweets.csv', inferSchema=True, header=True)
tweets_csv.show(truncate=False, n=5)

+------+---------+---------------+---------------------------------+
|ItemID|Sentiment|SentimentSource|SentimentText                    |
+------+---------+---------------+---------------------------------+
|1038  |1        |Sentiment140   |that film is fantastic #brilliant|
|1804  |1        |Sentiment140   |this music is really bad #myband |
|1693  |0        |Sentiment140   |winter is terrible #thumbs-down  |
|1477  |0        |Sentiment140   |this game is awful #nightmare    |
|45    |1        |Sentiment140   |I love jam #loveit               |
+------+---------+---------------+---------------------------------+
only showing top 5 rows



# Select the related data

In [25]:
from pyspark.sql.functions import col
#select only "SentimentText" and "Sentiment" column, 
#and cast "Sentiment" column data into integer
data = tweets_csv.select("SentimentText", col("Sentiment").cast("Int").alias("label"))
data.show(truncate = False,n=5)

+---------------------------------+-----+
|SentimentText                    |label|
+---------------------------------+-----+
|that film is fantastic #brilliant|1    |
|this music is really bad #myband |1    |
|winter is terrible #thumbs-down  |0    |
|this game is awful #nightmare    |0    |
|I love jam #loveit               |1    |
+---------------------------------+-----+
only showing top 5 rows



# Divide data into training and testing data

In [26]:
#divide data, 70% for training, 30% for testing
dividedData = data.randomSplit([0.7, 0.3]) 
trainingData = dividedData[0] #index 0 = data training
testingData = dividedData[1] #index 1 = data testing
train_rows = trainingData.count()
test_rows = testingData.count()
print ("Training data rows:", train_rows, "; Testing data rows:", test_rows)

Training data rows: 1364 ; Testing data rows: 568


# Prepare training data

#Separate "SentimentText" into individual words using tokenizer

In [27]:
from pyspark.sql import functions
tokenizer = Tokenizer(inputCol="SentimentText", outputCol="SentimentWords")
tokenizedTrain = tokenizer.transform(trainingData)
tokenizedTrain.show(truncate=False, n=5)

+---------------------------------+-----+---------------------------------------+
|SentimentText                    |label|SentimentWords                         |
+---------------------------------+-----+---------------------------------------+
|I adore cheese #bestever         |1    |[i, adore, cheese, #bestever]          |
|I adore cheese #brilliant        |1    |[i, adore, cheese, #brilliant]         |
|I adore cheese #loveit           |1    |[i, adore, cheese, #loveit]            |
|I adore cheese #toptastic        |1    |[i, adore, cheese, #toptastic]         |
|I adore classical music #bestever|1    |[i, adore, classical, music, #bestever]|
+---------------------------------+-----+---------------------------------------+
only showing top 5 rows



Removing stop words (unimportant words to be features)

In [28]:
swr = StopWordsRemover(inputCol=tokenizer.getOutputCol(), 
                       outputCol="MeaningfulWords")
SwRemovedTrain = swr.transform(tokenizedTrain)
SwRemovedTrain.show(truncate=False, n=10)

+----------------------------------+-----+----------------------------------------+-------------------------------------+
|SentimentText                     |label|SentimentWords                          |MeaningfulWords                      |
+----------------------------------+-----+----------------------------------------+-------------------------------------+
|I adore cheese #bestever          |1    |[i, adore, cheese, #bestever]           |[adore, cheese, #bestever]           |
|I adore cheese #brilliant         |1    |[i, adore, cheese, #brilliant]          |[adore, cheese, #brilliant]          |
|I adore cheese #loveit            |1    |[i, adore, cheese, #loveit]             |[adore, cheese, #loveit]             |
|I adore cheese #toptastic         |1    |[i, adore, cheese, #toptastic]          |[adore, cheese, #toptastic]          |
|I adore classical music #bestever |1    |[i, adore, classical, music, #bestever] |[adore, classical, music, #bestever] |
|I adore classical music

# Converting words feature into numerical feature. In Spark 3.0.0,it is implemented in HashingTF funtion using Austin Appleby's MurmurHash 3 algorithm

In [29]:
hashTF = HashingTF(inputCol=swr.getOutputCol(), outputCol="features")
numericTrainData = hashTF.transform(SwRemovedTrain).select(
    'label', 'MeaningfulWords', 'features')
numericTrainData.show(truncate=False, n=3)

+-----+---------------------------+-------------------------------------------+
|label|MeaningfulWords            |features                                   |
+-----+---------------------------+-------------------------------------------+
|1    |[adore, cheese, #bestever] |(262144,[1689,91011,100089],[1.0,1.0,1.0]) |
|1    |[adore, cheese, #brilliant]|(262144,[1689,45361,100089],[1.0,1.0,1.0]) |
|1    |[adore, cheese, #loveit]   |(262144,[1689,100089,254974],[1.0,1.0,1.0])|
+-----+---------------------------+-------------------------------------------+
only showing top 3 rows



# Train our classifier model using training data

In [30]:
lr = LogisticRegression(labelCol="label", featuresCol="features", 
                        maxIter=10, regParam=0.01)
model = lr.fit(numericTrainData)
print ("Training is done!")

Training is done!


# Prepare testing data

In [31]:
tokenizedTest = tokenizer.transform(testingData)
SwRemovedTest = swr.transform(tokenizedTest)
numericTest = hashTF.transform(SwRemovedTest).select(
    'Label', 'MeaningfulWords', 'features')
numericTest.show(truncate=False, n=2)

+-----+---------------------------+-------------------------------------------+
|Label|MeaningfulWords            |features                                   |
+-----+---------------------------+-------------------------------------------+
|1    |[adore, cheese, #favorite] |(262144,[1689,100089,108624],[1.0,1.0,1.0])|
|1    |[adore, cheese, #thumbs-up]|(262144,[1689,88825,100089],[1.0,1.0,1.0]) |
+-----+---------------------------+-------------------------------------------+
only showing top 2 rows



# Predict testing data and calculate the accuracy model

In [32]:
prediction = model.transform(numericTest)
predictionFinal = prediction.select(
    "MeaningfulWords", "prediction", "Label")
predictionFinal.show(n=5, truncate = False)
correctPrediction = predictionFinal.filter(
    predictionFinal['prediction'] == predictionFinal['Label']).count()
totalData = predictionFinal.count()
print("correct prediction:", correctPrediction, ", total data:", totalData, 
      ", accuracy:", correctPrediction/totalData)

+-------------------------------+----------+-----+
|MeaningfulWords                |prediction|Label|
+-------------------------------+----------+-----+
|[adore, cheese, #favorite]     |1.0       |1    |
|[adore, cheese, #thumbs-up]    |1.0       |1    |
|[adore, coffee, #favorite]     |1.0       |1    |
|[adore, pop, music, #toptastic]|1.0       |1    |
|[adore, rock, music, #favorite]|1.0       |1    |
+-------------------------------+----------+-----+
only showing top 5 rows

correct prediction: 556 , total data: 568 , accuracy: 0.9788732394366197
