# Recommender system using Pyspark (ALS algorithm)
---


#### Requirements:
- Python
- Apache Spark

## Import Libraries and Initialize spark session

In [None]:
# import libraries
from pyspark import SparkContext
from pyspark.ml.recommendation import ALS
from pyspark.sql import SparkSession, Row, SQLContext
from pyspark.sql.functions import explode, count, mean, split, col, to_timestamp, udf
import pyspark.sql.functions as F
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

%matplotlib inline
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession, Row, SQLContext

In [2]:
appName = "Analysis Application with PySpark"

#initialize the spark session
spark = SparkSession.\
        builder.\
        appName("pyspark-notebook").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "512m").\
        getOrCreate()

#get sparkcontext from the sparksession
sc = spark.sparkContext
sqlContext = SQLContext(sc)

24/03/18 20:52:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [None]:
!pyspark --version

In [None]:
import sys
import jupyter_core

print("Python Version:", sys.version)
print("Jupyter Notebook Version:", jupyter_core.__version__)

In [None]:
!pip list

In [None]:
print("hi")

## Load Dataset in Apache Spark


In [3]:
df_business = sqlContext.read.json('dataset/yelp_academic_dataset_business.json')

24/03/18 20:53:29 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [None]:
df_review = sqlContext.read.json('dataset/yelp_academic_dataset_review.json')

In [None]:
df_user = sqlContext.read.json('dataset/yelp_academic_dataset_user.json')

# Exploratory Data Analysis (EDA)

## Business Dataset

In [4]:
df_business.printSchema()

root
 |-- address: string (nullable = true)
 |-- attributes: struct (nullable = true)
 |    |-- AcceptsInsurance: string (nullable = true)
 |    |-- AgesAllowed: string (nullable = true)
 |    |-- Alcohol: string (nullable = true)
 |    |-- Ambience: string (nullable = true)
 |    |-- BYOB: string (nullable = true)
 |    |-- BYOBCorkage: string (nullable = true)
 |    |-- BestNights: string (nullable = true)
 |    |-- BikeParking: string (nullable = true)
 |    |-- BusinessAcceptsBitcoin: string (nullable = true)
 |    |-- BusinessAcceptsCreditCards: string (nullable = true)
 |    |-- BusinessParking: string (nullable = true)
 |    |-- ByAppointmentOnly: string (nullable = true)
 |    |-- Caters: string (nullable = true)
 |    |-- CoatCheck: string (nullable = true)
 |    |-- Corkage: string (nullable = true)
 |    |-- DietaryRestrictions: string (nullable = true)
 |    |-- DogsAllowed: string (nullable = true)
 |    |-- DriveThru: string (nullable = true)
 |    |-- GoodForDancing: str

In [None]:
df_business.show(5)

In [None]:
df_business.describe().show()

In [None]:
# Total number of businesses
df_business.count()

In [None]:
# Actual open businesses
category_counts = df_business.groupBy("is_open").count().show()

In [None]:
average_stars = df_business.agg({"stars": "avg"}).collect()[0][0]
print(f"Average stars: {average_stars}")

In [None]:
# Stars distribution
df_business.groupBy("stars").count().sort("stars").show()

In [None]:
df_business.select("categories").show(truncate=False)

In [None]:
# Analysis over attributes
attribute_columns = df_business.select("Attributes.*").columns

In [None]:
for column in attribute_columns:
  print("Distribution for column: Attribute." + column)
  df_business.groupBy("Attributes." + column).count().show(truncate=False)

## Reviews Dataset

In [None]:
df_review.printSchema()

In [None]:
df_review.show(5)

In [None]:
df_review.count()

In [None]:
# Numerical columns, since they have similar structure and distribution,
# the analysis will be the same
for column in ["cool", "funny", "useful"]:
  print("\n\nAnalysis for column: " + column)
  df_review.select(column).describe().show()
  df_review.filter(col(column) < 0).show()
  print("Number of -1 rows for the column " + column + " are: " + str(df_review.filter(col(column) < 0).count()))

In [None]:
df_review.groupBy("stars").count().sort("stars").show()

In [None]:
# Look for null values in foreign keys and empty reviews
for column in ["business_id", "user_id", "text"]:
  print("\n\nAnalysis for column: " + column)
  print("Number of NULL " + column + " rows: " + str(df_review.filter(col(column).isNull()).count()))
  print("Number of EMPTY ('') " + column + " rows: " + str(df_review.filter(col(column) == "").count()))

## User Dataset

In [None]:
df_user.printSchema()

In [None]:
df_user.show(5)

In [None]:
df_user.describe().show()

In [None]:
df_user.count()

# Feature Engineering
Modify the datasets based on the analysis made

In [None]:
# Join all 3 datasets

## Business Dataset

In [None]:
# Discard useless attributes (that has more null values than actual usuful ones)

In [None]:
# Uniform attributes (transforming them in Boolean and setting NULL = None = False)

In [None]:
# Transform "categories" column to split elements
df_business = df_business.withColumn("categories_array", split("categories", ", "))
df_business.select("categories", "categories_array").show(truncate=False)

In [None]:
categories_distribution = df_business.select("business_id", explode("categories_array").alias("category")).groupBy("category").agg(count("business_id").alias("count"))

In [None]:
categories_distribution.sort("count", ascending=False).show(50, truncate=False)
print(f"Number of unique categories: {categories_distribution.count()}")
mean = categories_distribution.agg(mean("count")).collect()[0][0]
print(f"Average occurences of a category: {mean}")

## Review Dataset

In [None]:
# Convert "date" column to date format
df_review = df_review.withColumn("date", to_timestamp(df_review.date, "yyyy-MM-dd HH:mm:ss"))

## User Dataset

# Sentiment Analysis

## Using trained model with stars

In [None]:
from pyspark.ml.feature import HashingTF, IDF, RegexTokenizer, StopWordsRemover, StringIndexer, IndexToString
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
def stars_transformer(stars):
  if stars >= 4:
    return "positive"
  if stars == 3:
    return "neutral"
  else:
    return "negative"

stars_transf_udf = udf(lambda x: stars_transformer(x), StringType())

# Pass from stars to sentiment
df_review_sa = df_review.withColumn("sentiment", stars_transf_udf(col("stars")))

In [None]:
df_review_sa.groupBy("sentiment").count().orderBy(col("count").desc()).show()

In [None]:
# Calculate class weights to balance the dataset
class_weights = df_review_sa.groupBy("sentiment").count()
total_count = df_review_sa.count()
class_weights = class_weights.withColumn("classWeights", total_count / (class_weights["count"] * class_weights.count()))

In [None]:
# RegexTokenizer for more advanced tokenization
tokenizer = RegexTokenizer(inputCol="text", outputCol="words", pattern="\\W")

# StopWordsRemover to remove common words
add_stopwords = ["http","https","amp","rt","t","c","the"]
stop_words_remover = StopWordsRemover(inputCol="words", outputCol="filtered_words").setStopWords(add_stopwords)

# Apply TF-IDF
hashingTF = HashingTF(inputCol="filtered_words", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="features")

# StringIndexer for converting text labels to numerical values
label_indexer = StringIndexer(inputCol="sentiment", outputCol="label", handleInvalid="skip")

# # Create a Logistic Regression model
# lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10, regParam=0.3, elasticNetParam=0.8, family="multinomial")

# # Create a Decision Tree classifier
# dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=5)

# Create a Gradient Boost classifier
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=100, maxDepth=5, seed=42, weightCol="classWeights")

# Train the StringIndexer on the entire dataset to obtain the labels
label_model = label_indexer.fit(df_review_sa)
labels = label_model.labels

# IndexToString to convert predicted labels back to text
label_converter = IndexToString(inputCol="prediction", outputCol="prediction_text", labels=labels)

In [None]:
# Create a pipeline with all stages
pipeline = Pipeline(stages=[tokenizer,
                            stop_words_remover,
                            hashingTF,
                            idf,
                            label_indexer,
                            rf,
                            label_converter])

In [None]:
# Split the dataset
(trainingData, testData) = df_review_sa.randomSplit([0.8, 0.2], seed=42)

In [None]:
# Adding weights to the training dataset to train the model
trainingData_weighted = trainingData.join(class_weights, "sentiment")

In [None]:
# # Train the pipeline
# model = pipeline.fit(trainingData_weighted)

In [None]:
# # Save the model
# model.write().overwrite().save("models/sentiment_analysis_rf")

In [None]:
model = PipelineModel.load("models/sentiment_analysis_rf")

In [None]:
# Make predictions on the test data
predictions = model.transform(testData)

In [None]:
predictions.groupBy("prediction_text").count().show()

In [None]:
# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})
f1_score = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})
print(f"Accuracy: {accuracy}, Precision: {precision}, Recall: {recall}, F1 Score: {f1_score}")

## Using HuggingFace

In [None]:
# Use a pipeline as a high-level helper
from transformers import pipeline

sentiment_analysis = pipeline("text-classification", model="mrm8488/distilroberta-finetuned-financial-news-sentiment-analysis", max_length=512, truncation=True)

In [None]:
@F.pandas_udf('string')
def sentiment_udf(texts: pd.Series) -> pd.Series:
  translations = [result['label'] for result in sentiment_analysis(texts.to_list())]
  return pd.Series(translations)

In [None]:
predictions.show()

In [None]:
predictions = predictions.withColumn("bert-sentiment", sentiment_udf(predictions.text))

In [None]:
predictions.show()

# Recommandation System with ALS algorithm

# Code already written in the notebook (old)

## Filter Rows and columns



In [None]:
df_business = df_business.select("business_id","name", "stars",
                                 "review_count", "attributes",
                                 "categories", "city").withColumnRenamed("stars", "stars_restaurant")

df_business = df_business.filter((df_business['city'] == 'Toronto') & (df_business.categories.contains('Restaurants'))).drop('city')

In [None]:
df_review = df_review.join(df_business, on='business_id', how='inner')

Lets make a quick visualisation to the basic elements of our review table.

In [None]:
df_review.select(['business_id', 'user_id', 'stars']).show()

## Exploratory Data Analysis



In [None]:
reviews = df_review.select('stars').collect()
review_list = [reviews[i][0] for i in range(len(reviews))]

plt.hist(review_list, bins=[0.5,1.5,2.5,3.5,4.5,5.5], alpha=0.5,
         histtype='stepfilled', color='steelblue',
         edgecolor='none')
plt.ylabel('Frequency')
plt.xlabel('Rating')
plt.style.use('seaborn-white')

Quite generous public from Toronto. Most ratings are above 3. Now lets see the the distrubtion of ratings of each restaurants.

In [None]:
restaurant_reviews = df_business.select('stars_restaurant').collect()
restaurant_reviews_list = [restaurant_reviews[i][0] for i in range(len(restaurant_reviews))]


plt.hist(restaurant_reviews_list, bins=[0.5,1.5,2.5,3.5,4.5,5.5], alpha=0.5,
         histtype='stepfilled', color='steelblue',
         edgecolor='none')
plt.ylabel('Frequency')
plt.xlabel('Rating')
plt.style.use('seaborn-white')

Here were see a more normally distributed curve. Nevertheless most restaurants do pretty well.
Now lets visualize what are the most popular type of restaurants in Toronto. What kind of food do they serve? We will create a wordcloud.

In [None]:
restaurant_categories = df_business.select('categories').collect()
restaurant_categories_list = [restaurant_categories[i][0] for i in range(len(restaurant_categories))]

In [None]:
text = " ".join(review for review in restaurant_categories_list)

In [None]:
from wordcloud import WordCloud, STOPWORDS, ImageColorGenerator

# eliminate useless words
text = text.replace('Restaurants', "")
text = text.replace('bars', "")
text = text.replace('food', "")


# Generate a word cloud image
wordcloud = WordCloud(background_color="white").generate(text)

# Display the generated image:
# the matplotlib way:
plt.figure(figsize=(10,8))
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis("off")
plt.show()

> ## Convert String to index


In [None]:
indexer = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in ['business_id', 'user_id']]
pipeline = Pipeline(stages=indexer)
transformed = pipeline.fit(df_review).transform(df_review)
transformed.select(['business_id', 'user_id','business_id_index', 'user_id_index'])

## Split Dataset in train and test


In [None]:
(training, test) = transformed.randomSplit([0.8, 0.2])

## Create ALS model


In [None]:
als=ALS(maxIter=5,
        regParam=0.09,
        rank=25,
        userCol="user_id_index",
        itemCol="business_id_index",
        ratingCol="stars",
        coldStartStrategy="drop",
        nonnegative=True)

model=als.fit(training)

## Evaluate RMSE

In [None]:
evaluator=RegressionEvaluator(metricName="rmse",labelCol="stars",predictionCol="prediction")
predictions=model.transform(test)
rmse=evaluator.evaluate(predictions)
print("RMSE="+str(rmse))

## Visualize Recommendations



In [None]:
test = model.recommendForAllUsers(20).filter(col('user_id_index')==30).select("recommendations").collect()
topRestaurants = []
for item in test[0][0]:
    topRestaurants.append(item.business_id_index)

schema = StructType([StructField("business_id_index",IntegerType(),True)])
restaurants = spark.createDataFrame(topRestaurants,IntegerType()).toDF("business_id_index")


transformed\
.select(['business_id', 'user_id', 'stars', 'categories'])\
.filter(col('user_id_index')==30)\
.show()

restaurants\
.join(transformed, on = 'business_id_index', how = 'inner')\
.select(['business_id', 'stars', 'categories', 'name'])\
.drop_duplicates(subset=['name'])\
.show()

