# A Recommender System Using >>Apache Spark 2.4
### Predictive Analytics
#### Licence:
You can use this code for anything you may wish only leave this page:
#### AS IS; HOW IS, WHERE IS

In [2]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Recommender System').getOrCreate()

### Reading the Dataset

In [3]:
df = spark.read.csv('./static/petrol_stations.csv', inferSchema=True, header=True)
df.show(10, True) # Pulls the fist ten rows of the dataset

+------+---------------+------+
|userId|oil_gas_company|rating|
+------+---------------+------+
|   196|          Shell|     3|
|    63|          Shell|     3|
|   226|          Shell|     5|
|   154|          Shell|     3|
|   306|          Shell|     5|
|   296|          Shell|     4|
|    34|          Shell|     5|
|   271|          Shell|     4|
|   201|          Shell|     4|
|   209|          Shell|     4|
+------+---------------+------+
only showing top 10 rows



### Data size in terms of rows and records

In [4]:
print((df.count(), len(df.columns)))

(5386, 3)


Our Dataset contains 100,000 records with only three columns

Data Schema

In [5]:
df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- oil_gas_company: string (nullable = true)
 |-- rating: integer (nullable = true)



#### Two of the columns are Numerical while title column is categorical

#### Top users by number of companies rated

In [7]:
df.groupBy('userId').count().orderBy('count', ascending=False).show(10, False)

+------+-----+
|userId|count|
+------+-----+
|276   |25   |
|378   |24   |
|416   |24   |
|13    |24   |
|94    |23   |
|655   |22   |
|222   |22   |
|450   |22   |
|92    |21   |
|417   |21   |
+------+-----+
only showing top 10 rows



#### Bottom users by number of companies rated

In [8]:
df.groupBy('userId').count().orderBy('count', ascending=True).show(10, False)

+------+-----+
|userId|count|
+------+-----+
|384   |1    |
|530   |1    |
|626   |1    |
|211   |1    |
|34    |1    |
|516   |1    |
|772   |1    |
|31    |1    |
|853   |1    |
|133   |1    |
+------+-----+
only showing top 10 rows



## Feature Engineering

In [9]:
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer, IndexToString

#### So now we create the stringindexer object by mentioning the input column and output column.

In [13]:
stringIndexer = StringIndexer(inputCol='oil_gas_company',outputCol='company_no')
model = stringIndexer.fit(df)
indexed = model.transform(df)
indexed.show(10, False)

+------+---------------+------+----------+
|userId|oil_gas_company|rating|company_no|
+------+---------------+------+----------+
|196   |Shell          |3     |18.0      |
|63    |Shell          |3     |18.0      |
|226   |Shell          |5     |18.0      |
|154   |Shell          |3     |18.0      |
|306   |Shell          |5     |18.0      |
|296   |Shell          |4     |18.0      |
|34    |Shell          |5     |18.0      |
|271   |Shell          |4     |18.0      |
|201   |Shell          |4     |18.0      |
|209   |Shell          |4     |18.0      |
+------+---------------+------+----------+
only showing top 10 rows



In [14]:
print (df.groupBy('oil_gas_company').count().orderBy('count',ascending=False).show(10,False),
       indexed.groupBy('company_no').count().orderBy('count',ascending=False).show(10,False))

+------------------------+-----+
|oil_gas_company         |count|
+------------------------+-----+
|Obama Oil               |452  |
|Pride of Libya          |390  |
|Nairobi Oil             |365  |
|National Oil of Naigeria|303  |
|Total                   |297  |
|Twister Oil             |293  |
|SA gas                  |280  |
|Arrow Gas               |254  |
|Oil Ghana               |243  |
|Rubis                   |227  |
+------------------------+-----+
only showing top 10 rows

+----------+-----+
|company_no|count|
+----------+-----+
|0.0       |452  |
|1.0       |390  |
|2.0       |365  |
|3.0       |303  |
|4.0       |297  |
|5.0       |293  |
|6.0       |280  |
|7.0       |254  |
|8.0       |243  |
|9.0       |227  |
+----------+-----+
only showing top 10 rows

None None


## Spliting The Data Set
We split it into a 80 to 20 ratio to train the model and test its accuracy

In [15]:
train,test=indexed.randomSplit([0.8,0.2])
print("Training set", train.count())
print("Testing set", test.count())

Training set 4325
Testing set 1061


##  Building and Training the Model

In [16]:
from pyspark.ml.recommendation import ALS
rec=ALS(maxIter=10,regParam=0.01,userCol='userId',itemCol='company_no',ratingCol='rating',nonnegative=True)
rec_model = rec.fit(train)

### Performance evaluation on our test data
Here we will chack the performance of our model on unseen data.

In [17]:
predict_ratings = rec_model.transform(test)
predict_ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- oil_gas_company: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- company_no: double (nullable = false)
 |-- prediction: float (nullable = false)



In [18]:
predict_ratings.show(10)

+------+---------------+------+----------+----------+
|userId|oil_gas_company|rating|company_no|prediction|
+------+---------------+------+----------+----------+
|   471|   Bluevale gas|     3|      28.0| 3.2815936|
|   577|   Bluevale gas|     3|      28.0|  3.909167|
|    13|   Bluevale gas|     4|      28.0| 2.5660963|
|   314|   Bluevale gas|     5|      28.0| 4.9225645|
|   771|   Bluevale gas|     4|      28.0| 3.8473582|
|    95|   Bluevale gas|     1|      28.0| 2.4544296|
|   712|   Bluevale gas|     5|      28.0| 3.5826695|
|   892|   Bluevale gas|     4|      28.0| 4.2607064|
|   311|   Bluevale gas|     2|      28.0| 3.7062418|
|   174|   Bluevale gas|     1|      28.0|  2.545432|
+------+---------------+------+----------+----------+
only showing top 10 rows



In [19]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(metricName = 'rmse', predictionCol = 'prediction', labelCol = 'rating')
rmse = evaluator.evaluate(predict_ratings)
print(rmse)

nan


##### The rmse is non we can equate this to zero meaning there is no error.

### Recommend Top Movies That Active User Might Like
The first step is to create a list of unique movies in the dataframe.

In [20]:
unique_movies=indexed.select('company_no').distinct()
unique_movies.count() ## Total numbe rof individual movies in our new dataframe

31

In [21]:
a = unique_movies.alias('a')

#### We can select any user within the dataset for which we need to recommend other movies. In our case, we go ahead with userId = 96.

#### We will filter the movies that this active user has already rated or seen.

In [22]:
user_id=96
watched_movies=indexed.filter(indexed['userId'] == user_id).select('company_no').distinct()
watched_movies.count() # Number of movies the user has watched

7

In [23]:
b=watched_movies.alias('b')

#### So, there are total of 56 unique movies out of 1,664 movies that this active user has already rated. So, we would want to recommend movies from the remaining 1608 movies.

In [25]:
total_movies = a.join(b, a.company_no == b.company_no,how='left')
total_movies.show()

+----------+----------+
|company_no|company_no|
+----------+----------+
|       8.0|       8.0|
|       0.0|       0.0|
|       7.0|      null|
|      29.0|      null|
|      18.0|      null|
|       1.0|       1.0|
|      25.0|      null|
|       4.0|      null|
|      23.0|      null|
|      11.0|      11.0|
|      21.0|      null|
|      14.0|      null|
|      22.0|      null|
|       3.0|      null|
|      19.0|      null|
|      28.0|      null|
|       2.0|      null|
|      17.0|      null|
|      27.0|      null|
|      10.0|      null|
+----------+----------+
only showing top 20 rows



In [27]:
remaining_movies=total_movies.where(col("b.company_no").isNull()).select(a.company_no).distinct()
remaining_movies.count()

24

In [28]:
remaining_movies=remaining_movies.withColumn("userId",lit(int(user_id)))
remaining_movies.show(10, False)

+----------+------+
|company_no|userId|
+----------+------+
|7.0       |96    |
|29.0      |96    |
|18.0      |96    |
|25.0      |96    |
|4.0       |96    |
|23.0      |96    |
|21.0      |96    |
|14.0      |96    |
|22.0      |96    |
|3.0       |96    |
+----------+------+
only showing top 10 rows



### Finally, we can now make the predictions on this remaining movie’s dataset for the active user using the recommender model that we built earlier. We filter only a few top recommendations that have the highest predicted ratings.

In [29]:
recommendations = rec_model.transform(remaining_movies).orderBy('prediction', ascending = False)
recommendations.show(100, False)

+----------+------+----------+
|company_no|userId|prediction|
+----------+------+----------+
|22.0      |96    |6.3537607 |
|18.0      |96    |6.317786  |
|3.0       |96    |6.0053926 |
|16.0      |96    |5.924051  |
|25.0      |96    |5.284143  |
|2.0       |96    |5.1653776 |
|4.0       |96    |5.0898423 |
|23.0      |96    |4.867482  |
|15.0      |96    |4.7191076 |
|12.0      |96    |4.6622024 |
|17.0      |96    |4.6519527 |
|14.0      |96    |4.1190066 |
|29.0      |96    |3.5772333 |
|21.0      |96    |3.4781663 |
|19.0      |96    |3.3091264 |
|13.0      |96    |3.2998471 |
|27.0      |96    |3.2995825 |
|30.0      |96    |3.2968261 |
|24.0      |96    |3.2274218 |
|20.0      |96    |3.0360923 |
|10.0      |96    |3.0055346 |
|7.0       |96    |2.749645  |
|28.0      |96    |2.697393  |
|5.0       |96    |2.3718653 |
+----------+------+----------+



### Let us add the movie title to the recommendations

In [30]:
movie_title = IndexToString(inputCol="company_no",outputCol="oil_gas_company",labels=model.labels)
final_recommendations=movie_title.transform(recommendations)
final_recommendations.show(100,False)

+----------+------+----------+------------------------+
|company_no|userId|prediction|oil_gas_company         |
+----------+------+----------+------------------------+
|22.0      |96    |6.3537607 |Lake Oil                |
|18.0      |96    |6.317786  |Shell                   |
|3.0       |96    |6.0053926 |National Oil of Naigeria|
|16.0      |96    |5.924051  |Vivo                    |
|25.0      |96    |5.284143  |Pro gas                 |
|2.0       |96    |5.1653776 |Nairobi Oil             |
|4.0       |96    |5.0898423 |Total                   |
|23.0      |96    |4.867482  |Nyerere gas             |
|15.0      |96    |4.7191076 |Abuja Oil               |
|12.0      |96    |4.6622024 |K gas                   |
|17.0      |96    |4.6519527 |Sahara Pride            |
|14.0      |96    |4.1190066 |Abuja gas               |
|29.0      |96    |3.5772333 |Pwani Oil               |
|21.0      |96    |3.4781663 |Oillibya                |
|19.0      |96    |3.3091264 |Vpower Mombasa    

### Thats it guys
### A simple collaborative filtering based recommender system in PySpark using the ALS method to recommend movies to the users

### Bye