In [71]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.sql import Row
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoderEstimator
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import IntegerType
from pyspark.ml.regression import LinearRegression

from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col
spark = SparkSession.builder.appName('tmp').getOrCreate()

In [53]:
df=spark.read.csv("../final_val.csv")
df=df.withColumnRenamed("_c0","user_id")\
    .withColumnRenamed("_c1","book_id")\
    .withColumnRenamed("_c2","is_read")\
    .withColumnRenamed("_c3","rating")\
    .withColumnRenamed("_c4","is_reviewed")
df=df.select("user_id","book_id","rating")\
    .withColumn("user_id", df["user_id"].cast(IntegerType()))\
    .withColumn("book_id", df["book_id"].cast(IntegerType()))\
    .withColumn("rating", df["rating"].cast(IntegerType()))
df=df.filter(df.rating!=0)
df,test=df.randomSplit([0.8,0.2],seed=100)
df.show(5)

+-------+-------+------+
|user_id|book_id|rating|
+-------+-------+------+
|   3027|  56510|     5|
|   3027|  57413|     3|
|   3027|  58353|     5|
|   3027|  60050|     4|
|   3027|  68880|     3|
+-------+-------+------+
only showing top 5 rows



In [54]:
book_id = df.select('book_id').distinct()
training_books,holdout_books=book_id.select("book_id").distinct().randomSplit([0.8,0.2],seed=100)
train = training_books.alias('df1').join(df.alias("df2"), training_books.book_id == df.book_id).select("df2.*")
holdout= holdout_books.alias('df1').join(df.alias("df2"), holdout_books.book_id == df.book_id).select("df2.*")

In [34]:
k=spark.read.json("../goodreads_book_genres_initial.json")
books_id_map=spark.read.parquet("../books_id_mapping.parquet")

In [35]:
k.take(5)

[Row(book_id='5333265', genres=Row(children=None, comics, graphic=None, fantasy, paranormal=None, fiction=None, history, historical fiction, biography=1, mystery, thriller, crime=None, non-fiction=None, poetry=None, romance=None, young-adult=None)),
 Row(book_id='1333909', genres=Row(children=None, comics, graphic=None, fantasy, paranormal=None, fiction=219, history, historical fiction, biography=5, mystery, thriller, crime=None, non-fiction=None, poetry=None, romance=None, young-adult=None)),
 Row(book_id='7327624', genres=Row(children=None, comics, graphic=None, fantasy, paranormal=31, fiction=8, history, historical fiction, biography=None, mystery, thriller, crime=1, non-fiction=None, poetry=1, romance=None, young-adult=None)),
 Row(book_id='6066819', genres=Row(children=None, comics, graphic=None, fantasy, paranormal=None, fiction=555, history, historical fiction, biography=None, mystery, thriller, crime=10, non-fiction=None, poetry=None, romance=23, young-adult=None)),
 Row(book_i

In [36]:
df=k.rdd.map(lambda x:[int(x.book_id)]+[int(i) if i else 0 for i in x.genres])

In [37]:
df=df.toDF(["book_id","children", "comics_graphic", "fantasy_paranormal", 
           "fiction", "history_historical_fiction_biography", "mystery_thriller_crime", 
            "non_fiction", "poetry", "romance", "young_adult"])
df=df.join(books_id_map,books_id_map.real_book_id==df.book_id).drop("real_book_id").drop("book_id").withColumnRenamed("book_id_csv","book_id")

In [38]:
df.printSchema()

root
 |-- children: long (nullable = true)
 |-- comics_graphic: long (nullable = true)
 |-- fantasy_paranormal: long (nullable = true)
 |-- fiction: long (nullable = true)
 |-- history_historical_fiction_biography: long (nullable = true)
 |-- mystery_thriller_crime: long (nullable = true)
 |-- non_fiction: long (nullable = true)
 |-- poetry: long (nullable = true)
 |-- romance: long (nullable = true)
 |-- young_adult: long (nullable = true)
 |-- book_id: string (nullable = true)



In [40]:
df.show(5)

+--------+--------------+------------------+-------+------------------------------------+----------------------+-----------+------+-------+-----------+-------+
|children|comics_graphic|fantasy_paranormal|fiction|history_historical_fiction_biography|mystery_thriller_crime|non_fiction|poetry|romance|young_adult|book_id|
+--------+--------------+------------------+-------+------------------------------------+----------------------+-----------+------+-------+-----------+-------+
|       0|             0|                 0|     22|                                 136|                     0|       1577|     0|      0|          0|  17588|
|       0|             0|                 0|      0|                                 198|                     0|        742|     0|      0|          0|   1189|
|       0|             0|                 0|     58|                                   0|                     0|          0|     0|      0|          0|  18828|
|       0|             0|               

# KNN model

In [41]:
features=["children", "comics_graphic", "fantasy_paranormal", 
           "fiction", "history_historical_fiction_biography", "mystery_thriller_crime", 
            "non_fiction", "poetry", "romance", "young_adult"]
vecAssembler = VectorAssembler(inputCols=features, outputCol="features")
df_transform=vecAssembler.transform(df)
train_knn=df_transform.join(train,"book_id").select("book_id","features")
holdout_knn=df_transform.join(holdout,"book_id").select("book_id","features")
#train_knn.take(5)

In [43]:
train_knn.count(),holdout_knn.count()

(61011, 15111)

In [49]:
brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", bucketLength=2.0,
                                  numHashTables=5)
brp_model = brp.fit(train_knn)
print("Approximately joining dfA and dfB on Euclidean distance smaller than 0.8:")
j=brp_model.approxSimilarityJoin(holdout_knn, train_knn, 0.8, distCol="EuclideanDistance")\
    .select(col("datasetA.book_id").alias("holdout_idA"),\
    col("datasetB.book_id").alias("train_idB"),\
    col("EuclideanDistance"))
j.show(10)

Approximately joining dfA and dfB on Euclidean distance smaller than0.8:
+-----------+---------+-----------------+
|holdout_idA|train_idB|EuclideanDistance|
+-----------+---------+-----------------+
|      42140|   678856|              0.0|
|     137847|   389374|              0.0|
|     197954|   488467|              0.0|
|     492684|   602517|              0.0|
|     314547|  1911958|              0.0|
|     936241|   473934|              0.0|
|     863755|   444643|              0.0|
|      75533|   224412|              0.0|
|     202733|  1146063|              0.0|
|     462994|   920473|              0.0|
+-----------+---------+-----------------+
only showing top 10 rows



In [50]:
tmp1=j.groupBy("holdout_idA").count()
tmp1.agg({"count":"min"}).collect(),tmp1.agg({"count":"max"}).collect()

([Row(min(count)=1)], [Row(max(count)=1080)])

# Latent Represent training set

In [55]:
als = ALS(userCol='user_id',
            itemCol='book_id',
            ratingCol='rating',
          coldStartStrategy='drop',
              nonnegative=True
              )
model = als.fit(train)

In [56]:
ItemMatrix=model.itemFactors
UserMatrix=model.userFactors
productFeatures=model.itemFactors.rdd.map(lambda x: [x.id]+[i for i in x.features])\
    .toDF(["book_id"]+["features"+str(i) for i in range(10)])

In [58]:
#productFeatures.show(5)

# Latent Represent holdout set

In [59]:
train_df=j.join(productFeatures,j.train_idB==productFeatures.book_id).drop("book_id")

In [60]:
gdf=train_df.groupBy("holdout_idA")
df_all=gdf.agg({"features0": "avg","features1":"avg","features2":"avg","features3":"avg",
                "features4": "avg","features5":"avg","features6":"avg","features7":"avg",
               "features8": "avg","features9":"avg"})\
        .withColumnRenamed("avg(features0)","features0").withColumnRenamed("avg(features1)","features1")\
        .withColumnRenamed("avg(features2)","features2").withColumnRenamed("avg(features3)","features3")\
        .withColumnRenamed("avg(features4)","features4").withColumnRenamed("avg(features5)","features5")\
        .withColumnRenamed("avg(features6)","features6").withColumnRenamed("avg(features7)","features7")\
        .withColumnRenamed("avg(features8)","features8").withColumnRenamed("avg(features9)","features9")

In [62]:
extra_items=df_all.rdd.map(lambda x:[x.holdout_idA]+[[x.features0,x.features1,x.features2,x.features3,x.features4,x.features5,
                                                     x.features6,x.features7,x.features8,x.features9]]).toDF(["book_id","features"])

In [16]:
k1=holdout.select("book_id").distinct()
extra_items=extra_items.join(k1,"book_id")

# New model

In [63]:
extra_items=extra_items.withColumn("book_id", extra_items["book_id"].cast(IntegerType()))

In [64]:
extra_items=extra_items.withColumnRenamed("features","book_features")
UserMatrix=UserMatrix.withColumnRenamed("id","user_id").withColumnRenamed("features","user_features")
k2=holdout.select("user_id").distinct()
UserMatrix=UserMatrix.join(k2,"user_id")
l=extra_items.crossJoin(UserMatrix)
l=l.rdd.map(lambda x: [x.book_id,x.user_id]+[sum([i*j for i,j in zip(x.book_features,x.user_features)])])
l=l.take(2000000)

In [65]:
l=spark.createDataFrame(l,["book_id","user_id","new_rating"])
l.take(5)

[Row(book_id=2338369, user_id=16339, new_rating=1.951860844324294),
 Row(book_id=2338369, user_id=100986, new_rating=2.5821718500905435),
 Row(book_id=2338369, user_id=264147, new_rating=2.899381590239869),
 Row(book_id=2338369, user_id=352633, new_rating=2.4593578578747004),
 Row(book_id=2338369, user_id=512952, new_rating=2.8397713132373634)]

In [20]:
l=extra_items.crossJoin(UserMatrix)
l2=l.rdd.map(lambda x: [x.book_id,x.user_id]+[sum([i*j for i,j in zip(x.book_features,x.user_features)])])
l2=l2.take(3000000)
l2=spark.createDataFrame(l2,["book_id","user_id","new_rating"])

In [67]:
# ===========================================================================================
l.createOrReplaceTempView("l")
holdout.createOrReplaceTempView("holdout")

In [68]:
res=spark.sql("SELECT holdout.book_id, holdout.user_id, l.new_rating FROM l JOIN holdout ON l.user_id = holdout.user_id AND l.book_id = holdout.book_id")

In [73]:
beauty=res.take(20)

In [91]:
pd.set_option("display.column_space",20)

In [92]:
pd.options.display.max_rows
pd.DataFrame(beauty,columns=["book_id","user_id","rating"])

Unnamed: 0,book_id,user_id,rating
0,1174955,262554,2.456925
1,173455,583601,4.063086
2,140678,264845,1.76556
3,93681,412372,1.930299
4,857871,646726,1.758787
5,13429,754276,2.998549
6,69621,214423,3.283679
7,6361,517672,1.539619
8,29105,173458,3.574426
9,8043,801487,4.260011


# Examples

In [93]:
idd=7048
print(df.filter(df.book_id==idd).collect())
print("\n")
print("Predicted rating:",3.14)
print(holdout.filter(holdout.book_id==idd).select("rating").collect())

[Row(user_id=399688, book_id=7048, rating=5), Row(user_id=134586, book_id=7048, rating=5), Row(user_id=507928, book_id=7048, rating=5), Row(user_id=756425, book_id=7048, rating=4), Row(user_id=398089, book_id=7048, rating=4), Row(user_id=140406, book_id=7048, rating=4), Row(user_id=357754, book_id=7048, rating=3)]


Predicted rating: 3.14
[Row(rating=5), Row(rating=5), Row(rating=5), Row(rating=4), Row(rating=4), Row(rating=4), Row(rating=3)]


In [62]:
idd=46743
print(df.filter(df.book_id==idd).collect())
print("\n")
print("Predicted rating:",3.01)
print(holdout.filter(holdout.book_id==idd).select("rating").collect())

[Row(children=0, comics_graphic=0, fantasy_paranormal=969, fiction=2317, history_historical_fiction_biography=0, mystery_thriller_crime=249, non_fiction=0, poetry=0, romance=81, young_adult=0, book_id='46743')]


Predicted rating: 3.01
[Row(rating='2'), Row(rating='3')]


In [63]:
idd=71792
print(df.filter(df.book_id==idd).collect())
print("\n")
print("Predicted rating:",3.14)
print(holdout.filter(holdout.book_id==idd).select("rating").collect())

[Row(children=1228, comics_graphic=0, fantasy_paranormal=0, fiction=546, history_historical_fiction_biography=29, mystery_thriller_crime=1936, non_fiction=0, poetry=0, romance=0, young_adult=562, book_id='71792')]


Predicted rating: 3.14
[Row(rating='4')]


In [64]:
idd=173455
print(df.filter(df.book_id==idd).collect())
print("\n")
print("Predicted rating:",4.25)
print(holdout.filter(holdout.book_id==idd).select("rating").collect())

[Row(children=0, comics_graphic=0, fantasy_paranormal=0, fiction=3953, history_historical_fiction_biography=136, mystery_thriller_crime=14462, non_fiction=0, poetry=0, romance=0, young_adult=70, book_id='173455')]


Predicted rating: 4.25
[Row(rating='4')]


In [65]:
idd=1588090
print(df.filter(df.book_id==idd).collect())
print("\n")
print("Predicted rating:",1.87)
print(holdout.filter(holdout.book_id==idd).select("rating").collect())

[Row(children=0, comics_graphic=0, fantasy_paranormal=0, fiction=1, history_historical_fiction_biography=0, mystery_thriller_crime=0, non_fiction=0, poetry=0, romance=0, young_adult=0, book_id='1588090')]


True rating: 1.87
[Row(rating='2')]


In [68]:
idd=788525
print(df.filter(df.book_id==idd).collect())
print("\n")
print("Predicted rating:",1.58)
print(holdout.filter(holdout.book_id==idd).select("rating").collect())

[Row(children=0, comics_graphic=0, fantasy_paranormal=0, fiction=0, history_historical_fiction_biography=0, mystery_thriller_crime=0, non_fiction=3, poetry=0, romance=0, young_adult=0, book_id='788525')]


Predicted rating: 1.58
[Row(rating='5')]


# predictions

In [94]:
train=train.select("user_id","book_id","rating")
res=res.union(train)
res=res.withColumn("user_id", res["user_id"].cast(IntegerType()))\
    .withColumn("book_id", res["book_id"].cast(IntegerType()))\
    .withColumn("rating", res["new_rating"].cast(IntegerType()))
als = ALS(userCol='user_id',
            itemCol='book_id',
            ratingCol='rating',
           coldStartStrategy='drop',
              nonnegative=True
              )
model1 = als.fit(res)

In [95]:
test=test.withColumn("user_id", test["user_id"].cast(IntegerType()))\
    .withColumn("book_id", test["book_id"].cast(IntegerType()))\
    .withColumn("rating", test["rating"].cast(IntegerType()))
predictions = model1.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 2.4694402748293944


In [96]:
df,test=df.randomSplit([0.8,0.2],seed=100)
df=df.withColumn("user_id", df["user_id"].cast(IntegerType()))\
    .withColumn("book_id", df["book_id"].cast(IntegerType()))\
    .withColumn("rating", df["rating"].cast(IntegerType()))
als = ALS(userCol='user_id',
            itemCol='book_id',
            ratingCol='rating',
          coldStartStrategy='drop',
              nonnegative=True
              )
model = als.fit(df)

In [97]:
test=test.withColumn("user_id", test["user_id"].cast(IntegerType()))\
    .withColumn("book_id", test["book_id"].cast(IntegerType()))\
    .withColumn("rating", test["rating"].cast(IntegerType()))
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 1.4767669838797601
