<a href="https://colab.research.google.com/github/LakshmanYES/My-School-Projects/blob/main/Sai_Lakshman_Ethakatla_HW_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Spark init
!wget -q https://dlcdn.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz 
!tar xf spark-3.1.2-bin-hadoop2.7.tgz
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/default-java"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"
!pip install -q findspark
import findspark
findspark.init()

In [None]:
from pyspark.sql.types import *
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [None]:
custom_schema = StructType([
        StructField("userId", IntegerType(), True),
        StructField("ISBN", StringType(), True),
        StructField("rating", IntegerType(), True)
    ])

In [None]:
df_book_rating = spark.read.csv('/content/drive/MyDrive/Colab_notebook/BX-Book-Ratings.csv', sep=';',
                         schema = custom_schema, header=True)

In [None]:
df_book_rating.show()

+------+----------+------+
|userId|      ISBN|rating|
+------+----------+------+
|276725|034545104X|     0|
|276726|0155061224|     5|
|276727|0446520802|     0|
|276729|052165615X|     3|
|276729|0521795028|     6|
|276733|2080674722|     0|
|276736|3257224281|     8|
|276737|0600570967|     6|
|276744|038550120X|     7|
|276745| 342310538|    10|
|276746|0425115801|     0|
|276746|0449006522|     0|
|276746|0553561618|     0|
|276746|055356451X|     0|
|276746|0786013990|     0|
|276746|0786014512|     0|
|276747|0060517794|     9|
|276747|0451192001|     0|
|276747|0609801279|     0|
|276747|0671537458|     9|
+------+----------+------+
only showing top 20 rows



In [None]:
df_books_filtered = df_book_rating[df_book_rating['rating'] > 0]
df_books_filtered.count()
df_books_filtered.show()

+------+----------+------+
|userId|      ISBN|rating|
+------+----------+------+
|276726|0155061224|     5|
|276729|052165615X|     3|
|276729|0521795028|     6|
|276736|3257224281|     8|
|276737|0600570967|     6|
|276744|038550120X|     7|
|276745| 342310538|    10|
|276747|0060517794|     9|
|276747|0671537458|     9|
|276747|0679776818|     8|
|276747|0943066433|     7|
|276747|1885408226|     7|
|276748|0747558167|     6|
|276751|3596218098|     8|
|276754|0684867621|     8|
|276755|0451166892|     5|
|276760|8440682697|    10|
|276762|0380711524|     5|
|276762|3453092007|     8|
|276762|3453213025|     3|
+------+----------+------+
only showing top 20 rows



In [None]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="ISBN", outputCol="isbn_index")
df_book_rating_indexed = indexer.fit(df_book_rating).transform(df_books_filtered)
df_book_rating_indexed.show()
df_book_rating_indexed.count()

+------+----------+------+----------+
|userId|      ISBN|rating|isbn_index|
+------+----------+------+----------+
|276726|0155061224|     5|   89067.0|
|276729|052165615X|     3|  205984.0|
|276729|0521795028|     6|  206014.0|
|276736|3257224281|     8|   43132.0|
|276737|0600570967|     6|  216574.0|
|276744|038550120X|     7|     232.0|
|276745| 342310538|    10|  135627.0|
|276747|0060517794|     9|    1413.0|
|276747|0671537458|     9|     914.0|
|276747|0679776818|     8|    2367.0|
|276747|0943066433|     7|  273158.0|
|276747|1885408226|     7|  296143.0|
|276748|0747558167|     6|   53332.0|
|276751|3596218098|     8|   28525.0|
|276754|0684867621|     8|     562.0|
|276755|0451166892|     5|     148.0|
|276760|8440682697|    10|  325376.0|
|276762|0380711524|     5|    2387.0|
|276762|3453092007|     8|   43258.0|
|276762|3453213025|     3|  310147.0|
+------+----------+------+----------+
only showing top 20 rows



433671

In [None]:
# Import the required functions
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [None]:
# Create test and train set
(training, test) = df_book_rating_indexed.randomSplit([0.8, 0.2], seed = 100)
print (training.first())

Row(userId=8, ISBN='0002005018', rating=5, isbn_index=10774.0)


In [None]:
# Create ALS model
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="isbn_index", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(training)

In [None]:
print ("Total  user in training data ", model.userFactors.count())
print ("Total books in the training data ", model.itemFactors.count())

Total  user in training data  68403
Total books in the training data  159035


In [None]:
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")

In [None]:
predictions.show()

+------+----------+------+----------+----------+
|userId|      ISBN|rating|isbn_index|prediction|
+------+----------+------+----------+----------+
|114089|0451166892|     8|     148.0|  5.596349|
|217740|0451166892|     8|     148.0|-0.7508993|
| 94985|0451166892|     8|     148.0|   3.97499|
| 28204|0451166892|    10|     148.0|  10.02032|
|246507|0451166892|    10|     148.0|  6.158074|
| 48025|0451166892|     9|     148.0| 3.1237543|
|216010|0451166892|     9|     148.0| 6.6962337|
|204790|0451166892|     8|     148.0| 14.576223|
| 33179|0451166892|    10|     148.0| 6.3083963|
| 11629|0451166892|     8|     148.0| -1.060991|
|  5268|0451166892|    10|     148.0| 1.7691737|
|114988|0451166892|     7|     148.0| 4.5640116|
| 79942|0451166892|    10|     148.0|  2.366581|
| 17003|0142004235|     9|     463.0|  -6.56421|
|171055|0142004235|     7|     463.0| -2.832954|
|  9226|0142004235|     9|     463.0|-5.0983434|
| 68721|0142004235|     8|     463.0|  5.040035|
| 64436|0142004235| 

In [None]:
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 9.086747966844355


In [None]:
users = df_book_rating_indexed.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 5)

userSubsetRecs.show(truncate=False)

+------+--------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                   |
+------+--------------------------------------------------------------------------------------------------+
|1591  |[{10950, 11.055745}, {1970, 9.999588}, {6272, 9.928529}, {5245, 9.74199}, {2394, 9.6393175}]      |
|463   |[{15987, 16.31985}, {3188, 15.8176365}, {5805, 15.776455}, {3881, 15.704398}, {4630, 15.640852}]  |
|496   |[{24548, 14.186534}, {4029, 13.705001}, {7447, 13.683207}, {10967, 13.605435}, {13473, 13.327509}]|
+------+--------------------------------------------------------------------------------------------------+



In [None]:
from pyspark.sql.functions import *

In [None]:
df_explode = userSubsetRecs.withColumn('recomm_pair', explode('recommendations'))
df_explode.show()

+------+--------------------+------------------+
|userId|     recommendations|       recomm_pair|
+------+--------------------+------------------+
|  1591|[{10950, 11.05574...|{10950, 11.055745}|
|  1591|[{10950, 11.05574...|  {1970, 9.999588}|
|  1591|[{10950, 11.05574...|  {6272, 9.928529}|
|  1591|[{10950, 11.05574...|   {5245, 9.74199}|
|  1591|[{10950, 11.05574...| {2394, 9.6393175}|
|   463|[{15987, 16.31985...| {15987, 16.31985}|
|   463|[{15987, 16.31985...|{3188, 15.8176365}|
|   463|[{15987, 16.31985...| {5805, 15.776455}|
|   463|[{15987, 16.31985...| {3881, 15.704398}|
|   463|[{15987, 16.31985...| {4630, 15.640852}|
|   496|[{24548, 14.18653...|{24548, 14.186534}|
|   496|[{24548, 14.18653...| {4029, 13.705001}|
|   496|[{24548, 14.18653...| {7447, 13.683207}|
|   496|[{24548, 14.18653...|{10967, 13.605435}|
|   496|[{24548, 14.18653...|{13473, 13.327509}|
+------+--------------------+------------------+



In [None]:

df_explode = df_explode.select('userId', col('recomm_pair.isbn_index'), col('recomm_pair.rating'))
df_explode.show()

+------+----------+----------+
|userId|isbn_index|    rating|
+------+----------+----------+
|  1591|     10950| 11.055745|
|  1591|      1970|  9.999588|
|  1591|      6272|  9.928529|
|  1591|      5245|   9.74199|
|  1591|      2394| 9.6393175|
|   463|     15987|  16.31985|
|   463|      3188|15.8176365|
|   463|      5805| 15.776455|
|   463|      3881| 15.704398|
|   463|      4630| 15.640852|
|   496|     24548| 14.186534|
|   496|      4029| 13.705001|
|   496|      7447| 13.683207|
|   496|     10967| 13.605435|
|   496|     13473| 13.327509|
+------+----------+----------+



In [None]:

book_schema = StructType([
        StructField("ISBN", StringType(), True),
        StructField("BookTitle", StringType(), True),
        StructField("BookAuthor", StringType(), True),
        StructField("YearOfPublication", StringType(), True),
        StructField("Publisher", StringType(), True),
        StructField("ImageURLS", StringType(), True),
        StructField("ImageURLM", StringType(), True),
        StructField("ImageURLL", StringType(), True)
    ])

In [None]:

df_books_data = spark.read.csv('/content/drive/MyDrive/Colab_notebook/BX-Books.csv', sep=';',
                         schema = book_schema, header=True)

In [None]:
df_books_data.show()
df_books_data.count()

+----------+--------------------+--------------------+-----------------+--------------------+--------------------+--------------------+--------------------+
|      ISBN|           BookTitle|          BookAuthor|YearOfPublication|           Publisher|           ImageURLS|           ImageURLM|           ImageURLL|
+----------+--------------------+--------------------+-----------------+--------------------+--------------------+--------------------+--------------------+
|0195153448| Classical Mythology|  Mark P. O. Morford|             2002|Oxford University...|http://images.ama...|http://images.ama...|http://images.ama...|
|0002005018|        Clara Callan|Richard Bruce Wright|             2001|HarperFlamingo Ca...|http://images.ama...|http://images.ama...|http://images.ama...|
|0060973129|Decision in Normandy|        Carlo D'Este|             1991|     HarperPerennial|http://images.ama...|http://images.ama...|http://images.ama...|
|0374157065|Flu: The Story of...|    Gina Bari Kolata|    

271379

In [None]:

book_isbn_indexer = StringIndexer(inputCol="ISBN", outputCol="isbn_index")
df_book_isbn_indexed = book_isbn_indexer.fit(df_books_data).transform(df_books_data)
df_book_isbn_indexed.show()
df_book_isbn_indexed.count()

+----------+--------------------+--------------------+-----------------+--------------------+--------------------+--------------------+--------------------+----------+
|      ISBN|           BookTitle|          BookAuthor|YearOfPublication|           Publisher|           ImageURLS|           ImageURLM|           ImageURLL|isbn_index|
+----------+--------------------+--------------------+-----------------+--------------------+--------------------+--------------------+--------------------+----------+
|0195153448| Classical Mythology|  Mark P. O. Morford|             2002|Oxford University...|http://images.ama...|http://images.ama...|http://images.ama...|   25030.0|
|0002005018|        Clara Callan|Richard Bruce Wright|             2001|HarperFlamingo Ca...|http://images.ama...|http://images.ama...|http://images.ama...|      73.0|
|0060973129|Decision in Normandy|        Carlo D'Este|             1991|     HarperPerennial|http://images.ama...|http://images.ama...|http://images.ama...|    

271379

In [None]:

df_recom_book = df_explode.join(df_book_isbn_indexed, 'isbn_index',) \
                                                .select(df_explode['userId'], 
                                                df_explode['isbn_index'], 
                                                df_book_isbn_indexed['ISBN'],
                                                df_book_isbn_indexed['BookTitle'])

In [None]:
df_recom_book.show(truncate=False)

+------+----------+----------+----------------------------------------------------------------------------------------------------+
|userId|isbn_index|ISBN      |BookTitle                                                                                           |
+------+----------+----------+----------------------------------------------------------------------------------------------------+
|496   |24548     |0194216373|Cry, Freedom (Oxford Bookworms S.)                                                                  |
|463   |15987     |0140092323|Lake Wobegon days                                                                                   |
|463   |3881      |006016221X|Fairyland: A Novel                                                                                  |
|463   |5805      |0060523867|The Confusion (The Baroque Cycle, Vol. 2)                                                           |
|1591  |6272      |0060589914|Knocked Out by My Nunga-Nungas (rack) (Confess