In [1]:
import os
import pyspark
conf = pyspark.SparkConf()
conf.set('spark.ui.proxyBase', '/user/' + os.environ['JUPYTERHUB_USER'] + '/proxy/4041') 
conf.set('spark.sql.repl.eagerEval.enabled', True)
conf.set('spark.driver.memory','4g')
sc = pyspark.SparkContext(conf=conf)
spark = pyspark.SQLContext.getOrCreate(sc)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/21 17:09:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
spark

<pyspark.sql.context.SQLContext at 0x7feb188b26b0>

## Import the dataset

In [None]:
books = spark.read.json("../shared/finals/swarali/goodreads_books.json")



In [None]:
books.printSchema()

## Content Based Recommendation

 ### RECOMMEND TOP 5 FOR ANY BOOK BASED ON THE DESCRIPTION SIMILARITY 

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, HashingTF, IDF, Normalizer
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import BucketedRandomProjectionLSH, StopWordsRemover
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("ContentBasedRecommender").getOrCreate()

books_df = books.limit(1000)
books_df  = books_df.na.drop(subset=["description"])
books_df = books_df.filter(col("description").isNotNull() & (col("description") != ""))
# Preprocessing: Tokenize the description text
tokenizer = Tokenizer(inputCol="description", outputCol="words")
words_data = tokenizer.transform(books_df)

#remove stop words
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
filtered_df = remover.transform(words_data)

# Feature Extraction: Apply HashingTF and IDF
hashing_tf = HashingTF(inputCol="words", outputCol="raw_features")
featurized_data = hashing_tf.transform(filtered_df)
idf = IDF(inputCol="raw_features", outputCol="idf_features")
idf_model = idf.fit(featurized_data)
tfidf_data = idf_model.transform(featurized_data)

# Normalize the features
normalizer = Normalizer(inputCol="idf_features", outputCol="features")
normalized_data = normalizer.transform(tfidf_data)

# Approximate Similarity Join
brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", bucketLength=2.0, numHashTables=3)
model = brp.fit(normalized_data)
hashed_df = model.transform(normalized_data)


In [8]:
#recommend for one book 

def recommend_books(book_id, top_n):
    # Filter for the given book_id
    query_df = hashed_df.filter(hashed_df.book_id == book_id)

    # Compute the approx similarity join
    similar_books = model.approxSimilarityJoin(query_df, hashed_df, threshold=1.5, distCol="EuclideanDistance")

    # Select the books and their distances
    similar_books_df = similar_books.select(col("datasetB.book_id").alias("book_id"), col("EuclideanDistance"))
    
    # Filter out the query book and limit to top N
    similar_books_df = similar_books_df.filter(similar_books_df.book_id != book_id).orderBy("EuclideanDistance").limit(top_n)

    return similar_books_df

# Example usage
specific_book_id = 1333909  
recommended_books = recommend_books(specific_book_id, 5)

recommended_books.show()

23/12/21 11:09:42 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
23/12/21 11:24:46 WARN DAGScheduler: Broadcasting large task binary with size 10.7 MiB
[Stage 7:>                                                          (0 + 1) / 1]

+--------+------------------+
| book_id| EuclideanDistance|
+--------+------------------+
|23848481|1.3151596017023932|
|30074967|1.3154304733485993|
|33036180| 1.319748119745085|
| 1296784|1.3239779300070345|
|15703099|1.3257883986798034|
+--------+------------------+



                                                                                

In [9]:
final_recommendations = recommended_books.join(books, "book_id")

In [12]:
# Selected Book features
books.filter(col('book_id')==specific_book_id).select("book_id","title", "description").show(truncate = False)



+-------+-----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

In [11]:
# The TOP 5 recommedation for the above book
final_recommendations.select("book_id", "EuclideanDistance", "title", "description").show(truncate = False)

23/12/21 12:01:38 WARN DAGScheduler: Broadcasting large task binary with size 10.7 MiB

+--------+------------------+-----------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

## Top 5 Recommendations for each book 

In [None]:
from pyspark.sql.functions import col, rank
from pyspark.sql.window import Window

def get_top_5_recommendations_for_all_books(df, model):
    # Perform the approx similarity join for the whole dataset
    similar_books_df = model.approxSimilarityJoin(df, df, threshold=float("inf"), distCol="EuclideanDistance") \
                            .filter("datasetA.book_id != datasetB.book_id")  # Exclude same book comparisons

    # Select required columns and rename them
    similar_books_df = similar_books_df.select(
        col("datasetA.book_id").alias("book_id"),
        col("datasetB.book_id").alias("similar_book_id"),
        col("EuclideanDistance")
    )

    # Add a rank column based on EuclideanDistance for each book
    windowSpec = Window.partitionBy("book_id").orderBy("EuclideanDistance")
    similar_books_df = similar_books_df.withColumn("rank", rank().over(windowSpec))

    # Filter to keep only top 5 similar books for each book
    top_5_similar_books_df = similar_books_df.filter(col("rank") <= 5)

    return top_5_similar_books_df

# Apply the function to get top 5 recommendations for all books
top_5_recommendations_for_all_books = get_top_5_recommendations_for_all_books(hashed_df, model)


In [None]:
top_5_recommendations_for_all_books.show(1)

## Recommedation system - First 

In [9]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, HashingTF, IDF, Normalizer
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import BucketedRandomProjectionLSH, StopWordsRemover
from pyspark.sql.functions import col


books_df = books.limit(10000)
books_df  = books_df.na.drop(subset=["description"])

# Preprocessing: Tokenize the description text
tokenizer = Tokenizer(inputCol="description", outputCol="words")
words_data = tokenizer.transform(books_df)

# Optionally remove stop words
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
filtered_df = remover.transform(words_data)

# Feature Extraction: Apply HashingTF and IDF
hashing_tf = HashingTF(inputCol="words", outputCol="raw_features")
featurized_data = hashing_tf.transform(filtered_df)
idf = IDF(inputCol="raw_features", outputCol="idf_features")
idf_model = idf.fit(featurized_data)
tfidf_data = idf_model.transform(featurized_data)

# Normalize the features
normalizer = Normalizer(inputCol="idf_features", outputCol="features")
normalized_data = normalizer.transform(tfidf_data)

# Approximate Similarity Join
brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", bucketLength=2.0, numHashTables=3)
model = brp.fit(normalized_data)
hashed_df = model.transform(normalized_data)


                                                                                

In [19]:
filtered_books_df = books_df.filter(col("description").isNotNull() & (col("description") != ""))

In [None]:
books_df  = books_df.na.drop(subset=["description"])

In [None]:
# recommend top 5 for one book 

def recommend_books(book_id, top_n):
    # Filter for the given book_id
    query_df = hashed_df.filter(hashed_df.book_id == book_id)

    # Compute the approx similarity join
    similar_books = model.approxSimilarityJoin(query_df, hashed_df, threshold=1.5, distCol="EuclideanDistance")

    # Select the books and their distances
    similar_books_df = similar_books.select(col("datasetB.book_id").alias("book_id"), col("EuclideanDistance"))
    
    # Filter out the query book and limit to top N
    similar_books_df = similar_books_df.filter(similar_books_df.book_id != book_id).orderBy("EuclideanDistance").limit(top_n)

    return similar_books_df

# Example usage
specific_book_id = 5333265  # Replace with an actual book_id
recommended_books = recommend_books(specific_book_id, 5)

In [13]:
# Example usage
specific_book_id = 1333909 # Replace with an actual book_id
recommended_books = recommend_books(specific_book_id, 5)

In [14]:
recommended_books.show()

23/12/21 09:15:07 WARN DAGScheduler: Broadcasting large task binary with size 10.2 MiB
23/12/21 09:15:22 WARN DAGScheduler: Broadcasting large task binary with size 10.5 MiB
[Stage 23:>                                                         (0 + 1) / 1]

+--------+------------------+
| book_id| EuclideanDistance|
+--------+------------------+
|13579397|1.3182980253347245|
|  981695|1.3213593122781784|
|  169075|1.3350641636905347|
|16047776|1.3353816795787177|
| 8529354|1.3360743253474139|
+--------+------------------+



                                                                                

In [15]:
final_recommendations = recommended_books.join(books, "book_id")

In [16]:
final_recommendations.select("book_id", "EuclideanDistance", "title", "description").show(truncate = False)

23/12/21 09:37:34 WARN DAGScheduler: Broadcasting large task binary with size 10.2 MiB
23/12/21 09:37:48 WARN DAGScheduler: Broadcasting large task binary with size 10.5 MiB

+--------+------------------+----------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

In [11]:
recommended_books.show()

23/12/21 08:58:15 WARN DAGScheduler: Broadcasting large task binary with size 10.2 MiB
23/12/21 08:58:34 WARN DAGScheduler: Broadcasting large task binary with size 10.5 MiB
[Stage 14:>                                                         (0 + 1) / 1]

+--------+-----------------+
| book_id|EuclideanDistance|
+--------+-----------------+
|30123434|              0.0|
|25552070|              0.0|
|12841602|              0.0|
| 4546375|              0.0|
|13068837|              0.0|
+--------+-----------------+



                                                                                

In [12]:
books.filter(col('book_id')=='30123434').show()



+----+-------------+--------------+--------+------------+-----------+-------------------+------+--------------------+--------+----+------+-----------+-------------+--------------------+---------+--------------------+---------------+-----------------+----------------+---------+-------------+------+-------------+------------------+--------------------+--------------------+--------------------+--------+
|asin|      authors|average_rating| book_id|country_code|description|edition_information|format|           image_url|is_ebook|isbn|isbn13|kindle_asin|language_code|                link|num_pages|     popular_shelves|publication_day|publication_month|publication_year|publisher|ratings_count|series|similar_books|text_reviews_count|               title|title_without_series|                 url| work_id|
+----+-------------+--------------+--------+------------+-----------+-------------------+------+--------------------+--------+----+------+-----------+-------------+--------------------+-------

                                                                                

In [None]:

# Self-join the DataFrame to find similar books
print("Approximately joining dfA and dfA on distance smaller than 1.5:")
recos = model.approxSimilarityJoin(normalized_data, normalized_data, 1.5).select(
    col("datasetA.book_id").alias("book_id_A"),
    col("datasetB.book_id").alias("book_id_B"),
    col("distCol")
)

In [23]:


similar_books_df = recos.filter(col("distCol") > 0).orderBy("distCol")

# Show top N similar book pairs
#similar_books_df.limit(10).show()

In [24]:
from pyspark.sql.functions import col

# Assuming your books DataFrame is named 'books_df' and has 'book_id', 'title', and 'description' columns
# Join the similar_books_df DataFrame with the books DataFrame
# Join for book_id_A
joined_df_A = similar_books_df.join(books_df.withColumnRenamed("book_id", "book_id_A").withColumnRenamed("title", "title_A").withColumnRenamed("description", "description_A"), on="book_id_A")

# Join for book_id_B
final_df = joined_df_A.join(books_df.withColumnRenamed("book_id", "book_id_B").withColumnRenamed("title", "title_B").withColumnRenamed("description", "description_B"), on="book_id_B")

# Selecting relevant columns
final_df = final_df.select("book_id_A", "title_A", "description_A", "book_id_B", "title_B", "description_B", "distCol")

# Show results
final_df.show(truncate=False)


23/12/21 00:05:01 WARN DAGScheduler: Broadcasting large task binary with size 10.2 MiB
23/12/21 00:05:03 WARN DAGScheduler: Broadcasting large task binary with size 10.5 MiB
[Stage 50:>                                                         (0 + 1) / 1]

+---------+---------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

In [25]:
final_df.select("title_A", "title_B","distCol").show(20)

23/12/21 00:05:09 WARN DAGScheduler: Broadcasting large task binary with size 10.2 MiB
23/12/21 00:05:11 WARN DAGScheduler: Broadcasting large task binary with size 10.5 MiB
[Stage 58:>                                                         (0 + 1) / 1]

+--------------------+--------------------+------------------+
|             title_A|             title_B|           distCol|
+--------------------+--------------------+------------------+
|W.C. Fields: A Li...|The Devil's Notebook| 1.414213562373095|
|W.C. Fields: A Li...|The Aeneid for Bo...|1.4142135623730951|
|W.C. Fields: A Li...|The Unschooled Wi...|1.4142135623730951|
|W.C. Fields: A Li...|         Good Harbor|1.4142135623730956|
|W.C. Fields: A Li...|Playmaker: A Veno...|1.4142135623730943|
|W.C. Fields: A Li...|Best Friends Forever|1.4142135623730945|
|W.C. Fields: A Li...|All's Fairy in Lo...|1.4142135623730943|
|         Good Harbor|The Devil's Notebook|1.3518357458341177|
|         Good Harbor|The Wanting of Le...|1.4142135623730956|
|         Good Harbor|The Aeneid for Bo...|1.3395741752879142|
|         Good Harbor|Runic Astrology: ...|1.4142135623730956|
|         Good Harbor|The Unschooled Wi...|1.3802195384743148|
|         Good Harbor|W.C. Fields: A Li...|1.4142135623

                                                                                

In [26]:
from pyspark.sql.functions import col

# Assuming you have the books_df and recommendation_df already loaded

# Join to add image URL for title_A
df = final_df.join(
    books_df.select(col("title").alias("title_A"), col("image_url").alias("image_url_A")),
    on="title_A",
    how="left"
)


In [27]:
df.select("title_A","image_url_A","title_B","distCol").show(20)

23/12/21 00:12:06 WARN DAGScheduler: Broadcasting large task binary with size 10.2 MiB
23/12/21 00:12:11 WARN DAGScheduler: Broadcasting large task binary with size 10.5 MiB
[Stage 69:>                                                         (0 + 1) / 1]

+--------------------+--------------------+--------------------+------------------+
|             title_A|         image_url_A|             title_B|           distCol|
+--------------------+--------------------+--------------------+------------------+
|W.C. Fields: A Li...|https://images.gr...|The Devil's Notebook| 1.414213562373095|
|W.C. Fields: A Li...|https://images.gr...|The Aeneid for Bo...|1.4142135623730951|
|W.C. Fields: A Li...|https://images.gr...|The Unschooled Wi...|1.4142135623730951|
|W.C. Fields: A Li...|https://images.gr...|         Good Harbor|1.4142135623730956|
|W.C. Fields: A Li...|https://images.gr...|Playmaker: A Veno...|1.4142135623730943|
|W.C. Fields: A Li...|https://images.gr...|Best Friends Forever|1.4142135623730945|
|W.C. Fields: A Li...|https://images.gr...|All's Fairy in Lo...|1.4142135623730943|
|         Good Harbor|https://s.gr-asse...|The Devil's Notebook|1.3518357458341177|
|         Good Harbor|https://s.gr-asse...|The Wanting of Le...|1.4142135623

                                                                                

In [28]:
final_df = df.join(
    books_df.select(col("title").alias("title_B"), col("image_url").alias("image_url_B")),
    on="title_B",
    how="left"
)

In [30]:
final_df.select("title_A","image_url_A","title_B","distCol","image_url_B").show(20)

23/12/21 00:24:14 WARN DAGScheduler: Broadcasting large task binary with size 10.2 MiB
23/12/21 00:24:18 WARN DAGScheduler: Broadcasting large task binary with size 10.5 MiB
[Stage 80:>                                                         (0 + 1) / 1]

+--------------------+--------------------+--------------------+------------------+--------------------+
|             title_A|         image_url_A|             title_B|           distCol|         image_url_B|
+--------------------+--------------------+--------------------+------------------+--------------------+
|W.C. Fields: A Li...|https://images.gr...|The Devil's Notebook| 1.414213562373095|https://images.gr...|
|W.C. Fields: A Li...|https://images.gr...|The Aeneid for Bo...|1.4142135623730951|https://s.gr-asse...|
|W.C. Fields: A Li...|https://images.gr...|The Unschooled Wi...|1.4142135623730951|https://images.gr...|
|W.C. Fields: A Li...|https://images.gr...|         Good Harbor|1.4142135623730956|https://s.gr-asse...|
|W.C. Fields: A Li...|https://images.gr...|Playmaker: A Veno...|1.4142135623730943|https://images.gr...|
|W.C. Fields: A Li...|https://images.gr...|Best Friends Forever|1.4142135623730945|https://s.gr-asse...|
|W.C. Fields: A Li...|https://images.gr...|All's Fairy 

                                                                                

In [31]:
content = final_df.toPandas()

23/12/21 00:27:28 WARN DAGScheduler: Broadcasting large task binary with size 10.2 MiB
23/12/21 00:27:32 WARN DAGScheduler: Broadcasting large task binary with size 10.5 MiB
                                                                                

In [33]:
content.to_csv('content.csv')  