<a href="https://colab.research.google.com/github/VisshnuPrethi/Pearson-correlation-coefficient-/blob/main/Prediction_of_Book_Sale_using_Pearson_Coefficient.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import os
from google.colab import drive
drive.mount("/content/drive", force_remount=True)

drive_dir = "/content/drive/MyDrive/ens/cnam/data/"
INPUT_CSV = drive_dir + "/Books.csv"
os.makedirs(drive_dir, exist_ok=True)
os.listdir(drive_dir)

Mounted at /content/drive


['Books.csv']

In [2]:
!pip install -q pyspark
!pip install -q findspark

In [3]:
import os
os.environ["SPARK_HOME"] = "/usr/local/lib/python3.12/dist-packages/pyspark"
os.environ["JAVA_HOME"] = "/usr"

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql.window import Window
from pyspark.sql.functions import col, expr
from pyspark.sql.functions import split
from pyspark.sql import Window
from pyspark.sql.functions import coalesce, lit
# change these paths as needed
# INPUT_CSV = "Books.csv"
OUT_DIR = "./out"   # change to where you want outputs

spark = SparkSession.builder \
    .appName("BooksPearsonCF") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

In [5]:
# import os
# from urllib import request

# import os
# from urllib import request

# def load_file(file,dir):
#   if(os.path.isfile(drive_dir+file)):
#     print(file, "is already stored")
#   # else:
#   #   url = PUBLIC_DATASET + "/"+ dir + "/" + file
#   #   print("downloading from URL: ", url, "save in : " + drive_dir   + file)
#   #   request.urlretrieve(url , drive_dir + file)

# #load_file("books.csv", "bookslens/ml-latest-small")
# load_file("books.csv",".")
# # load_file("ratings.csv", ".")

# # List of downloaded files
# print("Files downloaded:")
# os.listdir(drive_dir)

In [6]:
#The folder containing the imported csv files:
DATASET_DIR="/content/drive/MyDrive/ens/cnam/data"

In [7]:
#See an excerpt of books.csv
!head $DATASET_DIR/Books.csv

,user_id,location,age,isbn,rating,book_title,book_author,year_of_publication,publisher,img_s,img_m,img_l,Summary,Language,Category,city,state,country
0,2,"stockton, california, usa",18,195153448,0,Classical Mythology,Mark P. O. Morford,2002,Oxford University Press,http://images.amazon.com/images/P/0195153448.01.THUMBZZZ.jpg,http://images.amazon.com/images/P/0195153448.01.MZZZZZZZ.jpg,http://images.amazon.com/images/P/0195153448.01.LZZZZZZZ.jpg,"Provides an introduction to classical myths placing the addressed
topics within their historical context, discussion of archaeological
evidence as support for mythical events, and how these themes have
been portrayed in literature, art, ...",en,['Social Science'],stockton,california,usa
1,8,"timmins, ontario, canada",34.74389988,2005018,5,Clara Callan,Richard Bruce Wright,2001,HarperFlamingo Canada,http://images.amazon.com/images/P/0002005018.01.THUMBZZZ.jpg,http://images.amazon.com/images/P/0002005018.01.MZZZZZZZ.jpg,http://images.amazon.com/

In [8]:
df = spark.read.csv(
    INPUT_CSV,
    header=False,
    inferSchema=True,
    multiLine=True,
    escape='"',
    quote='"'
)
df = df.toDF("_c0", "user_id", "location", "age", "isbn", "rating",
             "book_title", "book_author", "year_of_publication", "publisher",
             "img_s", "img_m", "img_l", "Summary", "Language", "Category",
             "city", "state", "country")
df = df.drop("_c0")

df = df.filter(df.user_id != "user_id")

# quick sanity
print("rows loaded:", df.count())
df.printSchema()
df.show(5, truncate=200)

rows loaded: 101863
root
 |-- user_id: string (nullable = true)
 |-- location: string (nullable = true)
 |-- age: string (nullable = true)
 |-- isbn: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- book_title: string (nullable = true)
 |-- book_author: string (nullable = true)
 |-- year_of_publication: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- img_s: string (nullable = true)
 |-- img_m: string (nullable = true)
 |-- img_l: string (nullable = true)
 |-- Summary: string (nullable = true)
 |-- Language: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)

+-------+-------------------------+-----------+---------+------+-------------------+--------------------+-------------------+-----------------------+------------------------------------------------------------+---------------------------------------------------------

In [9]:
df = df.filter(df.user_id != "user_id")

df = df.withColumn("age", expr("try_cast(age as int)"))
df = df.withColumn("rating", expr("try_cast(rating as int)"))
df = df.withColumn("year_of_publication", expr("try_cast(year_of_publication as int)"))

#Split location into city, state, country
df = df.withColumn("city", split(col("location"), ",")[0])
df = df.withColumn("state", split(col("location"), ",")[1])
df = df.withColumn("country", split(col("location"), ",")[2])

print("rows loaded:", df.count())
df.printSchema()
df.show(5, truncate=200)

# rename columns if necessary (they likely already are these names)
df = df.withColumnRenamed("book_title", "title").withColumnRenamed("book_author", "author")
# keep digits only
df = df.withColumn("isbn_clean", F.regexp_replace(F.col("isbn").cast("string"), r"\D+", ""))

# drop rows where isbn_clean empty
df = df.filter(F.length(F.col("isbn_clean")) > 0)

# drop ISBNs that occur fewer than 2 times (early step per instructions)
isbn_counts = df.groupBy("isbn_clean").count().withColumnRenamed("count","isbn_count")
valid_isbns_2 = isbn_counts.filter(F.col("isbn_count") >= 2).select("isbn_clean")
df = df.join(valid_isbns_2, on="isbn_clean", how="inner")
# cast rating to double and drop non-castable
df = df.withColumn("rating", F.col("rating").cast(T.DoubleType()))
df = df.filter(F.col("rating").isNotNull())

# keep only ratings in [0,10]
df = df.filter((F.col("rating") >= 0.0) & (F.col("rating") <= 10.0))

# drop rating == 0 (0 means no rating and must be ignored)
df = df.filter(F.col("rating") != 0.0)


rows loaded: 101863
root
 |-- user_id: string (nullable = true)
 |-- location: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- isbn: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- book_title: string (nullable = true)
 |-- book_author: string (nullable = true)
 |-- year_of_publication: integer (nullable = true)
 |-- publisher: string (nullable = true)
 |-- img_s: string (nullable = true)
 |-- img_m: string (nullable = true)
 |-- img_l: string (nullable = true)
 |-- Summary: string (nullable = true)
 |-- Language: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)

+-------+-------------------------+----+---------+------+-------------------+--------------------+-------------------+-----------------------+------------------------------------------------------------+------------------------------------------------------------+

In [10]:
# Keep columns needed: user_id, isbn_clean, rating and metadata for books.
ratings_df = df.select("user_id", F.col("isbn_clean").alias("isbn"), "rating")
books_meta = df.select(F.col("isbn_clean").alias("isbn"), "title", "author", "year_of_publication", "publisher").dropDuplicates(["isbn"])

In [11]:
# If timestamp exists you may prefer latest. Here we check.
if "timestamp" in df.columns:
    # keep latest rating per user,isbn
    w = Window.partitionBy("user_id","isbn").orderBy(F.col("timestamp").desc())
    ratings_df = ratings_df.withColumn("rn", F.row_number().over(w)).filter(F.col("rn")==1).drop("rn")
else:
    # keep highest rating per (user,isbn)
    ratings_df = ratings_df.groupBy("user_id","isbn").agg(F.max("rating").alias("rating"))


In [12]:
user_counts = ratings_df.groupBy("user_id").count().withColumnRenamed("count","n_ratings")
users_active = user_counts.filter(F.col("n_ratings") >= 5).select("user_id")
ratings_df = ratings_df.join(users_active, on="user_id", how="inner")

In [13]:
book_counts = ratings_df.groupBy("isbn").count().withColumnRenamed("count","n_ratings")
books_active = book_counts.filter(F.col("n_ratings") >= 5).select("isbn")
ratings_df = ratings_df.join(books_active, on="isbn", how="inner")

# update books_meta to include only these ISBNs
books_clean = books_meta.join(books_active, on="isbn", how="inner")

In [14]:
# count distinct users per isbn (within filtered ratings_df)
isbn_user_counts = ratings_df.groupBy("isbn").agg(F.countDistinct("user_id").alias("user_count"))
shared_isbns = isbn_user_counts.filter(F.col("user_count") >= 2).select("isbn")
ratings_df = ratings_df.join(shared_isbns, on="isbn", how="inner")

# recompute user counts and enforce >=5 again (some users may have dropped)
user_counts_final = ratings_df.groupBy("user_id").count().withColumnRenamed("count","n_ratings")
users_active_final = user_counts_final.filter(F.col("n_ratings") >= 5).select("user_id")
ratings_df = ratings_df.join(users_active_final, on="user_id", how="inner")

In [15]:
# No ratings outside [0,10]
assert ratings_df.filter((F.col("rating") < 0) | (F.col("rating") > 10)).count() == 0

# No duplicated (user,isbn)
dups = ratings_df.groupBy("user_id","isbn").count().filter(F.col("count") > 1).count()
assert dups == 0

# No users with <5 ratings
users_lt5 = ratings_df.groupBy("user_id").count().filter(F.col("count") < 5).count()
assert users_lt5 == 0

# No books with <5 ratings
books_lt5 = ratings_df.groupBy("isbn").count().filter(F.col("count") < 5)
print("Number of books with <5 ratings = ", books_lt5.count())


# Non-empty ISBNs
assert ratings_df.filter(F.col("isbn").isNull() | (F.col("isbn") == "")).count() == 0

# Titles/authors cleaned (books_clean)
assert books_clean.filter(F.col("title").isNull() | (F.col("title") == "")).count() == 0
assert books_clean.filter(F.col("author").isNull() | (F.col("author") == "")).count() == 0

# Counts summary for report
n_users = ratings_df.select("user_id").distinct().count()
n_books = books_clean.select("isbn").distinct().count()
n_ratings = ratings_df.count()
print(f"Final users: {n_users}, books: {n_books}, ratings: {n_ratings}")


Number of books with <5 ratings =  15
Final users: 1632, books: 761, ratings: 16117


In [16]:
#Compute Average Rating per User
user_avg = ratings_df.groupBy("user_id") \
    .agg(F.avg("rating").alias("avg_rating"))

In [17]:
#Join average ratings to main dataframe
ratings_centered = ratings_df.join(user_avg, on="user_id", how="inner") \
    .withColumn("rating_centered", F.col("rating") - F.col("avg_rating"))


In [18]:
#Self-Join ratings to find common items between users
pairs = ratings_centered.alias("a") \
    .join(ratings_centered.alias("b"), on="isbn") \
    .where(F.col("a.user_id") < F.col("b.user_id")) \
    .select(
        F.col("a.user_id").alias("user_u"),
        F.col("b.user_id").alias("user_v"),
        F.col("a.rating_centered").alias("r_u"),
        F.col("b.rating_centered").alias("r_v")
    )


In [19]:
#Compute Pearson correlation for each pair
similarity = pairs.groupBy("user_u", "user_v").agg(
    F.sum(F.col("r_u") * F.col("r_v")).alias("num"),
    F.sqrt(F.sum(F.col("r_u") * F.col("r_u"))).alias("den_u"),
    F.sqrt(F.sum(F.col("r_v") * F.col("r_v"))).alias("den_v")
).withColumn(
    "similarity", F.col("num") / (F.col("den_u") * F.col("den_v"))
)


In [20]:
#Compute Item–Item Similarity (Pearson CF)
# Generate Predictions
pairs = ratings_df.alias("r1").join(
    ratings_df.alias("r2"),
    (F.col("r1.user_id") == F.col("r2.user_id")) &
    (F.col("r1.isbn") < F.col("r2.isbn"))
).select(
    F.col("r1.isbn").alias("isbn1"),
    F.col("r2.isbn").alias("isbn2"),
    F.col("r1.rating").alias("rating1"),
    F.col("r2.rating").alias("rating2")
)

pair_stats = pairs.groupBy("isbn1", "isbn2").agg(
    F.count("*").alias("count"),
    F.sum(F.col("rating1") * F.col("rating2")).alias("sum_prod"),
    F.sum("rating1").alias("sum1"),
    F.sum("rating2").alias("sum2"),
    F.sum(F.col("rating1") * F.col("rating1")).alias("sum1_sq"),
    F.sum(F.col("rating2") * F.col("rating2")).alias("sum2_sq")
)

pearson_sim = pair_stats.withColumn(
    "pearson_corr",
    ((F.col("sum_prod") - (F.col("sum1") * F.col("sum2") / F.col("count"))) /
     F.sqrt((F.col("sum1_sq") - (F.col("sum1") * F.col("sum1") / F.col("count"))) *
            (F.col("sum2_sq") - (F.col("sum2") * F.col("sum2") / F.col("count")))))
).filter(F.col("pearson_corr").isNotNull())


In [21]:
# join ratings with item–item similarity
neigh = ratings_df.alias("r").join(
    pearson_sim.alias("sim"),
    F.col("r.isbn") == F.col("sim.isbn1")
)

# remove self similarity
neigh = neigh.filter(F.col("sim.isbn1") != F.col("sim.isbn2"))

predictions = neigh.groupBy("r.user_id", "sim.isbn2").agg(
    F.sum(F.col("sim.pearson_corr") * F.col("r.rating")).alias("weighted_sum"),
    F.sum(F.abs(F.col("sim.pearson_corr"))).alias("sim_sum")
).withColumn(
    "pred_rating",
    F.when(F.col("sim_sum") != 0,
           F.col("weighted_sum") / F.col("sim_sum")
    ).otherwise(None)
)

# remove items already rated
predictions = predictions.alias("p").join(
    ratings_df.alias("r2"),
    (F.col("p.user_id") == F.col("r2.user_id")) &
    (F.col("p.isbn2") == F.col("r2.isbn")),
    how="left"
).filter(F.col("r2.rating").isNull()) \
 .select(F.col("p.user_id"), F.col("p.isbn2").alias("isbn"), "pred_rating")

# Select recommendations
TOP_N = 10

window = Window.partitionBy("user_id").orderBy(F.col("pred_rating").desc())

top_books = predictions.withColumn("rank", F.row_number().over(window)) \
                       .filter(F.col("rank") <= TOP_N) \
                       .drop("rank")
top_books = top_books.withColumn(
    "pred_rating",
    coalesce("pred_rating", lit(0.0))
)

#Filter out predictions with 0.0
top_books = top_books.filter(top_books.pred_rating > 0)

#Sort by user_id and rating for clean readability
top_books = top_books.orderBy("user_id", top_books.pred_rating.desc())

#Show final output
top_books.show(50, truncate=False)



+-------+----------+------------------+
|user_id|isbn      |pred_rating       |
+-------+----------+------------------+
|100088 |385511612 |10.000000000000002|
|100088 |60934719  |10.000000000000002|
|100088 |553578693 |10.000000000000002|
|100088 |743237188 |10.000000000000002|
|100088 |670869902 |10.000000000000002|
|100088 |515122408 |10.0              |
|100088 |684874350 |10.0              |
|100088 |671793489 |10.0              |
|100088 |553582747 |10.0              |
|100088 |671461494 |10.0              |
|100227 |446612790 |10.000000000000002|
|100227 |66238501  |10.000000000000002|
|100227 |64400026  |10.000000000000002|
|100227 |440206154 |10.000000000000002|
|100227 |140298479 |10.000000000000002|
|100227 |671027344 |10.0              |
|100227 |385315279 |10.0              |
|100227 |1558744150|10.0              |
|100227 |446360589 |10.0              |
|100227 |517556278 |10.0              |
|100393 |60987103  |5.000000000000001 |
|100393 |451167317 |5.000000000000001 |


In [22]:
#Min–Max normalize predictions per user
w = Window.partitionBy("user_id")

top_books = top_books.withColumn(
    "pred_rating_scaled",
    ((F.col("pred_rating") - F.min("pred_rating").over(w)) /
     (F.max("pred_rating").over(w) - F.min("pred_rating").over(w))) * 10
)

top_books = top_books.drop("pred_rating") \
                     .withColumnRenamed("pred_rating_scaled", "pred_rating")


In [23]:
#Round rating values
top_books = top_books.withColumn("pred_rating", F.round(F.col("pred_rating"), 2))


In [24]:
# Join book metadata to show title & author(Join metadata (title & author)
top_books = top_books.join(
    books_clean.select("isbn", "title", "author"),
    on="isbn",
    how="left"
)


In [25]:
top_books = top_books.withColumnRenamed("title", "book_title")

#Sort by user_id and rating for clean readability
top_books = top_books.orderBy("user_id", top_books.pred_rating.desc())

#Show final recommendation output
top_books.show(50, truncate=False)


+----------+-------+-----------+----------------------------------------------------------------------------+--------------------+
|isbn      |user_id|pred_rating|book_title                                                                  |author              |
+----------+-------+-----------+----------------------------------------------------------------------------+--------------------+
|385511612 |100088 |10.0       |Bleachers                                                                   |John Grisham        |
|60934719  |100088 |10.0       |stardust                                                                    |Neil Gaiman         |
|553578693 |100088 |10.0       |The Next Accident                                                           |LISA GARDNER        |
|743237188 |100088 |10.0       |Fall On Your Knees (Oprah #45)                                              |Ann-Marie MacDonald |
|670869902 |100088 |10.0       |How Stella Got Her Groove Back                     

In [26]:
#Remove zero-rating (invalid) recommendations
top_books = top_books.filter(top_books.pred_rating > 0)

#Reorder
top_books = top_books.orderBy("user_id", top_books.pred_rating.asc())
top_books.show(50, truncate=False)


+----------+-------+-----------+----------------------------------------------------------------------------+---------------------+
|isbn      |user_id|pred_rating|book_title                                                                  |author               |
+----------+-------+-----------+----------------------------------------------------------------------------+---------------------+
|385511612 |100088 |10.0       |Bleachers                                                                   |John Grisham         |
|60934719  |100088 |10.0       |stardust                                                                    |Neil Gaiman          |
|553578693 |100088 |10.0       |The Next Accident                                                           |LISA GARDNER         |
|743237188 |100088 |10.0       |Fall On Your Knees (Oprah #45)                                              |Ann-Marie MacDonald  |
|670869902 |100088 |10.0       |How Stella Got Her Groove Back              

In [27]:
#Save final recommendation results as CSV
top_books.coalesce(1).write.csv("final_clean_recommendations", header=True, mode="overwrite")

