<a href="https://colab.research.google.com/github/SalahSharaf/A_B_test_project/blob/master/recommendation_Engine_using_pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# we download data and all spark stuff

In [35]:
! chmod 600 /content/kaggle.json

In [38]:
! cp kaggle.json ~/.kaggle/

In [39]:
! chmod 600 ~/.kaggle/kaggle.json

In [40]:
!kaggle datasets download -d jirakst/bookcrossing

Downloading bookcrossing.zip to /content
 69% 17.0M/24.6M [00:00<00:00, 52.9MB/s]
100% 24.6M/24.6M [00:00<00:00, 62.0MB/s]


In [41]:
!unzip /content/bookcrossing.zip -d books_data/

Archive:  /content/bookcrossing.zip
replace books_data/BX-Book-Ratings.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: n
replace books_data/BX-Books.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: n
replace books_data/BX-Users.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: n


In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
!java -version

openjdk version "11.0.13" 2021-10-19
OpenJDK Runtime Environment (build 11.0.13+8-Ubuntu-0ubuntu1.18.04)
OpenJDK 64-Bit Server VM (build 11.0.13+8-Ubuntu-0ubuntu1.18.04, mixed mode, sharing)


In [None]:
!ls /usr/lib/jvm/

default-java		   java-11-openjdk-amd64     java-8-openjdk-amd64
java-1.11.0-openjdk-amd64  java-1.8.0-openjdk-amd64


In [None]:
!wget -q https://dlcdn.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz

In [None]:
!tar xf spark-3.1.2-bin-hadoop3.2.tgz

In [None]:
!pip install -q findspark

In [17]:
# import os
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2/bin"

In [None]:
# import findspark
# findspark.init()
# findspark.find()

'/content/spark-3.1.2-bin-hadoop3.2/python/pyspark'

## another way to use pyspark

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 24 kB/s 
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=fba41dea29cea599f920b2b1355822851c318eb44c88b2d7ca10153894b85c67
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.2.1


## first we connect to pyspark

In [2]:
import pyspark

In [3]:
conf=pyspark.conf.SparkConf().setAppName('recommender').setMaster('local')
sc=pyspark.SparkContext(conf=conf)
spark=pyspark.sql.SQLContext(sc)



In [4]:
sc.setCheckpointDir('my_dir')

## secondely we have to clean data

In [5]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql.types import StructType,StructField,LongType,StringType,IntegerType
from pyspark.sql.functions import isnan, when, count, col,isnull,monotonically_increasing_id

In [6]:
users_schema=StructType([
    StructField('user_id',StringType(),nullable=False),
    StructField('location',StringType(),nullable=True),
    StructField('Age',IntegerType(),nullable=True)
]
    )
users_df=spark.read.csv('/content/books_data/BX-Users.csv',sep=';',schema=users_schema)
users_df=users_df.withColumn('user_index',monotonically_increasing_id().cast('integer')).persist()
new_users_df=users_df.dropna(subset='user_id')
new_users_df.filter(isnull(col('user_id'))).count()
new_users_df.printSchema()


root
 |-- user_id: string (nullable = true)
 |-- location: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- user_index: integer (nullable = false)



In [7]:
books_df=spark.read.csv('/content/books_data/BX-Books.csv',sep=';',header=True)\
.select(col('ISBN').cast('long')
        ,col('Book-Title').cast('string').alias('book_title')
        ,col('Book-Author').cast('string').alias('book_author')
        ,col('Publisher').cast('string').alias('publisher'))

books_df.filter(isnull(col('ISBN'))).count()
new_books_df=books_df.dropna(subset=['ISBN'])
new_books_df=new_books_df.withColumn('book_index',monotonically_increasing_id().cast('integer')).persist()
new_books_df.printSchema()

root
 |-- ISBN: long (nullable = true)
 |-- book_title: string (nullable = true)
 |-- book_author: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- book_index: integer (nullable = false)



In [8]:
ratings_schema=StructType([
    StructField('user_id', StringType(),nullable=False),
    StructField('ISBN', StringType(),nullable=False),
    StructField('book_rating', IntegerType(),nullable=(False))
    ])
ratings_df=spark.read.csv('/content/books_data/BX-Book-Ratings.csv',sep=';',schema=ratings_schema)
ratings_df.filter(isnull(col('ISBN'))).count()
new_ratings_df=ratings_df.dropna()

In [9]:
full_df=new_ratings_df.join(new_books_df,'ISBN','inner').join(new_users_df,'user_id','inner')

In [10]:
print(full_df.count())
new_full_df=full_df.dropna()
new_full_df.count()


946306


691437

In [11]:
new_full_df.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- ISBN: string (nullable = true)
 |-- book_rating: integer (nullable = true)
 |-- book_title: string (nullable = true)
 |-- book_author: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- book_index: integer (nullable = false)
 |-- location: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- user_index: integer (nullable = false)



##  here we will use our model

In [12]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder,CrossValidator

In [19]:
als=ALS(userCol="user_index", itemCol="book_index", ratingCol="book_rating",
         nonnegative=True,
         coldStartStrategy="drop",
         implicitPrefs=False,
        rank=50,
        maxIter=20,
        regParam=0.01)
        
# param_grid = ParamGridBuilder()\
# .addGrid(als.rank, [25,50,60])\
# .addGrid(als.maxIter, [20,40])\
# .addGrid(als.regParam, [0.1,1, 1.5])\
# .build()

evaluator = RegressionEvaluator(metricName="rmse", labelCol="book_rating",predictionCol="prediction")

# cv = CrossValidator(estimator = als,
#                     estimatorParamMaps = param_grid,
#                     evaluator = evaluator,
#                     numFolds = 5)


In [14]:
new_ratings_df_sample=new_full_df.select('book_rating','book_index','user_index').sample(0.1,seed=42)

In [15]:
training_df,testing_df=new_ratings_df_sample.randomSplit([0.8,0.2],seed=42)
training_df.show(5)
training_df.cache()
training_df = training_df.checkpoint(eager=True)

+-----------+----------+----------+
|book_rating|book_index|user_index|
+-----------+----------+----------+
|          0|         5|    200674|
|          0|        16|     20435|
|          0|        16|     76942|
|          0|        16|    102967|
|          0|        16|    114544|
+-----------+----------+----------+
only showing top 5 rows



In [16]:
models=als.fit(training_df)

In [22]:
prediction=models.transform(testing_df)
prediction.show()

+-----------+----------+----------+-----------+
|book_rating|book_index|user_index| prediction|
+-----------+----------+----------+-----------+
|          0|        16|     23768|0.020858666|
|          0|        16|    198412|        0.0|
|          0|        16|    269566|   0.855924|
|          0|        24|      4285|0.041541554|
|          0|        24|     27650|        0.0|
|          0|        24|     33943| 0.38161463|
|          0|        24|     56856| 0.02384191|
|          0|        24|     63714| 0.16817255|
|          0|        24|     68256| 0.09720203|
|          0|        24|     76942|  0.1683183|
|          0|        24|     88729|        0.0|
|          0|        24|     99347|        0.0|
|          0|        24|    195374|  0.4825582|
|          0|        24|    196493|  0.7891096|
|          0|        24|    197505|        0.0|
|          0|        24|    232945|  0.1918664|
|          0|        25|    273086|        0.0|
|          0|        26|     76942| 0.41

In [21]:
evaluator.evaluate(prediction)

3.9361736740928426

In [28]:
models.recommendForAllUsers(1).show()



+----------+--------------------+
|user_index|     recommendations|
+----------+--------------------+
|        10|         [{40, 0.0}]|
|        75|         [{40, 0.0}]|
|        99|[{107260, 10.6820...|
|       114|[{232138, 14.24163}]|
|       193|         [{40, 0.0}]|
|       199|         [{40, 0.0}]|
|       215|[{200037, 7.822133}]|
|       254| [{14288, 8.674279}]|
|       289| [{69633, 9.149336}]|
|       361| [{90161, 5.624949}]|
|       388| [{57546, 9.171476}]|
|       392|         [{40, 0.0}]|
|       408|         [{40, 0.0}]|
|       424|         [{40, 0.0}]|
|       486|  [{18694, 8.62049}]|
|       503|[{90516, 10.161483}]|
|       505|    [{232921, 9.04}]|
|       538|[{104415, 9.223965}]|
|       569|         [{40, 0.0}]|
|       625|[{34224, 10.112593}]|
+----------+--------------------+
only showing top 20 rows

