In [20]:
import csv
import timeit
import datetime
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import matplotlib.pyplot as plt
import pandas as pd
import os
from pyspark.sql import Window, Column
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
folder = "book/"

In [2]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("somnambwl/bookcrossing-dataset")

print("Path to dataset files:", path)

Downloading from https://www.kaggle.com/api/v1/datasets/download/somnambwl/bookcrossing-dataset?dataset_version_number=1...


100%|██████████| 16.8M/16.8M [00:01<00:00, 10.9MB/s]

Extracting files...





Path to dataset files: /home/zhijie/.cache/kagglehub/datasets/somnambwl/bookcrossing-dataset/versions/1


In [None]:
def filter_data(line):
    if 'User' in line:
        return False
    return True

def split_data(data):
    datas = data.split(';')
    ISBN = datas[1].strip().strip("'").strip("\\").strip('"').strip("#").strip("(").strip(")")
    return int(datas[0]), ISBN, int(datas[2])

book_fields = [
    StructField("ISBN", StringType(), False), StructField("Title", StringType(), False), StructField("Author", StringType(), False),
    StructField("Year", IntegerType(), False), StructField("Publisher", StringType(), False)
]
book_schema = StructType(book_fields)

rating_fields = [
    StructField("UserId", IntegerType(), False), StructField("ISBN", StringType(), False), StructField("Rating", IntegerType(), True)
]
rating_schema = StructType(rating_fields)

user_fields = [
    StructField("UserId", IntegerType(), False), StructField("Age", IntegerType(), True)
]
user_schema = StructType(user_fields)

if not os.path.exists(folder + "Books.parquet"):
    data = spark.read.csv(folder+"Books.csv", header=True, schema=book_schema, sep=";")
    data.write.save(folder+"Books.parquet")

if not os.path.exists(folder + "Ratings.parquet"):
    from pyspark import SparkContext
    sc = SparkContext.getOrCreate()
    data = spark.createDataFrame(sc.textFile(folder+'Ratings.csv', 8).filter(filter_data).map(split_data), rating_schema)
    data.write.save(folder+"Ratings.parquet")

if not os.path.exists(folder + "Users.parquet"):
    data = spark.read.csv(folder+"Users.csv", header=True, schema=user_schema, sep=";")
    data.write.save(folder+"Users.parquet")


In [3]:
books = spark.read.load(folder+"Books.parquet").distinct()
ratings = spark.read.load(folder+"Ratings.parquet").distinct()
users = spark.read.load(folder+"Users.parquet").distinct().fillna({"age": 0})

join = users.join(ratings, 'UserId', 'inner')
data = join.join(books, 'ISBN', 'inner')

print("Books:", books.count())
print("Ratings:", ratings.count())
print("Users:", users.count())
print("Joined:", join.count())
print("Joined:", data.count())

                                                                                

Books: 271378


                                                                                

Ratings: 1149780
Users: 278859
Joined: 1149780


                                                                                

Joined: 1031178


24/12/23 17:13:47 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [58]:
data.show(10)

                                                                                

+----------+------+---+------+--------------------+-----------+----+--------------------+
|      ISBN|UserId|Age|Rating|               Title|     Author|Year|           Publisher|
+----------+------+---+------+--------------------+-----------+----+--------------------+
|0440201101|  3177| 44|     0|More Die of Heart...|Saul Bellow|1988|Dell Publishing C...|
|      NULL|  1173| 22|  NULL|                NULL|       NULL|NULL|                NULL|
|      NULL|  1657| 59|  NULL|                NULL|       NULL|NULL|                NULL|
|      NULL|  1662| 25|  NULL|                NULL|       NULL|NULL|                NULL|
|      NULL|  2342| 60|  NULL|                NULL|       NULL|NULL|                NULL|
|      NULL|  2408| 33|  NULL|                NULL|       NULL|NULL|                NULL|
|      NULL|  2874| 19|  NULL|                NULL|       NULL|NULL|                NULL|
|      NULL|  3182| 19|  NULL|                NULL|       NULL|NULL|                NULL|
|      NUL

In [56]:
data.filter("author IS NULL").show(100)

24/12/22 23:27:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
                                                                                

+-------------+------+---+------+-----+------+----+---------+
|         ISBN|UserId|Age|Rating|Title|Author|Year|Publisher|
+-------------+------+---+------+-----+------+----+---------+
|0 440 20615 4| 71726| 18|     9| NULL|  NULL|NULL|     NULL|
|0 7525 1962 x|266641|  0|     0| NULL|  NULL|NULL|     NULL|
|0.380.44099.7| 20919|  0|    10| NULL|  NULL|NULL|     NULL|
| 000000046565|159014|  0|     1| NULL|  NULL|NULL|     NULL|
| 000001246971|213324| 48|     0| NULL|  NULL|NULL|     NULL|
| 000001246971| 11776| 31|     8| NULL|  NULL|NULL|     NULL|
| 000001246971|268423| 42|     8| NULL|  NULL|NULL|     NULL|
|  0000225820X|242060| 50|     5| NULL|  NULL|NULL|     NULL|
|  00006715885| 33759|  0|     0| NULL|  NULL|NULL|     NULL|
|  00006715885|158856|  0|     0| NULL|  NULL|NULL|     NULL|
|   0001604465|168387| 32|     0| NULL|  NULL|NULL|     NULL|
|   000185741X|125774| 36|     8| NULL|  NULL|NULL|     NULL|
|   0001911619| 94853| 27|    10| NULL|  NULL|NULL|     NULL|
|   0002

In [None]:
data = users.join(ratings, 'UserId', 'left')
data.groupBy("UserId").agg(F.count(F.when(data.ISBN.isNotNull(), 1)).alias("BookCount")).orderBy("BookCount", ascending=False).collect()

278859

In [63]:
data = books.join(ratings, 'ISBN', 'left')
data.groupBy("publisher").agg(F.max(data.Rating)).collect()

                                                                                

[Row(publisher='Harper Mass Market Paperbacks (Mm)', max(Rating)=10),
 Row(publisher='LPC Group', max(Rating)=9),
 Row(publisher='Chicago Review Press', max(Rating)=10),
 Row(publisher='Adams Media Corp', max(Rating)=10),
 Row(publisher='Celestial Arts', max(Rating)=10),
 Row(publisher='Cleis Press', max(Rating)=10),
 Row(publisher='Chosen Books', max(Rating)=10),
 Row(publisher='Carroll &amp; Graf Publishers', max(Rating)=10),
 Row(publisher='Aqua Explorers', max(Rating)=9),
 Row(publisher='Houghton Mifflin Co (Jp)', max(Rating)=10),
 Row(publisher='Ramana Pubn', max(Rating)=0),
 Row(publisher='Regina Press Malhame & Company', max(Rating)=9),
 Row(publisher='Joshua Odell Editions Capra Press', max(Rating)=10),
 Row(publisher='No Exit Press', max(Rating)=7),
 Row(publisher='Lorenz Books', max(Rating)=10),
 Row(publisher='High Country Publishers', max(Rating)=10),
 Row(publisher='Ediciones Alfaguara, S.A.', max(Rating)=10),
 Row(publisher='Ullstein-Taschenbuch-Verlag, Zweigniederlassung

In [72]:
data = books.join(ratings, 'ISBN', 'left')
data.groupBy("Author").agg(F.count(data.Rating).alias("NumRatings")).orderBy("NumRatings", ascending=False).select("Author").first()

Row(Author='Stephen King')

In [9]:
data = books.join(ratings, 'ISBN', 'left')
overRating = Window.partitionBy(data.Publisher).orderBy(data.Rating.desc())
data.withColumn("rank", F.dense_rank().over(overRating)).where("rank <= 1").select("Publisher", "Title").collect()

                                                                                

[Row(Publisher='"""Otokar Kersovani\\"""', Title='"Gdje pijevac ne pjeva (Biblioteka ""Zlatni paun\\"")"'),
 Row(Publisher='"Flemish-Netherlands Foundation ""Stichting Ons Erfdeel,\\"""', Title='Contemporary poetry of the low countries'),
 Row(Publisher='101 Productions; [distributed by Scribner, New York]', Title='Manna: foods of the frontier'),
 Row(Publisher='22nd. Century, New York', Title='Portraits of Cities'),
 Row(Publisher='3', Title="Restoring Intimacy: The Patient's Guide to Maintaining Relationships During Depression"),
 Row(Publisher='3D Press', Title='Denver Hiking Guide:  45 Hikes within 45 Minutes of Denver'),
 Row(Publisher='3H Productions, Inc.', Title="Haley's Cleaning Hints"),
 Row(Publisher='3rd Woman Press', Title='Dictee'),
 Row(Publisher='A & W Pub', Title='The Adventures of Sherlock Holmes'),
 Row(Publisher='A & W Visual Library', Title="The left-handers' handbook"),
 Row(Publisher='A Golden Book', Title="What's In Oscar's Trash Can? (Sesame Street Good-Night S

In [16]:
data = books.join(ratings, 'ISBN', 'left')
overRating = Window.partitionBy(data.Publisher).orderBy(F.desc("num_ratings"))
# data.groupBy("Publisher", "Title").agg(F.count(data.Rating).alias("num_ratings")).select("Publisher", "Title", "num_ratings").collect()
data.groupBy("Publisher", "Title").agg(F.count(data.Rating).alias("num_ratings")).withColumn("rank", F.dense_rank().over(overRating)).where("rank <= 1").select("Publisher", "Title").collect()


                                                                                

[Row(Publisher='"""Otokar Kersovani\\"""', Title='"Gdje pijevac ne pjeva (Biblioteka ""Zlatni paun\\"")"'),
 Row(Publisher='"Flemish-Netherlands Foundation ""Stichting Ons Erfdeel,\\"""', Title='Contemporary poetry of the low countries'),
 Row(Publisher='(3 Queen Sq., WC1N 3AU), Faber and Faber Ltd', Title='The big chapel'),
 Row(Publisher='101 Productions; [distributed by Scribner, New York]', Title='Manna: foods of the frontier'),
 Row(Publisher='22nd. Century, New York', Title='Portraits of Cities'),
 Row(Publisher='3', Title="Restoring Intimacy: The Patient's Guide to Maintaining Relationships During Depression"),
 Row(Publisher='300Incredible.com', Title='300 Incredible Things for Sports Fans on the Internet'),
 Row(Publisher='3D Press', Title='Denver Hiking Guide:  45 Hikes within 45 Minutes of Denver'),
 Row(Publisher='3H Productions, Inc.', Title="Haley's Cleaning Hints"),
 Row(Publisher='3rd Bed', Title='Stories in the Worst Way'),
 Row(Publisher='3rd Woman Press', Title='Dict

In [23]:
data = books.join(ratings, 'ISBN', 'left')
overRating = Window.partitionBy(data.Publisher).orderBy(F.desc("num_ratings"))
data.groupBy("Publisher", "Title").agg(F.count(data.Rating).alias("num_ratings")).withColumn("diff", F.max("num_ratings").over(overRating) - F.col("num_ratings")).select("Publisher", "Title", "diff").collect()


                                                                                

[Row(Publisher='"""Otokar Kersovani\\"""', Title='"Gdje pijevac ne pjeva (Biblioteka ""Zlatni paun\\"")"', diff=0),
 Row(Publisher='"Flemish-Netherlands Foundation ""Stichting Ons Erfdeel,\\"""', Title='Contemporary poetry of the low countries', diff=0),
 Row(Publisher='3D Press', Title='Denver Hiking Guide:  45 Hikes within 45 Minutes of Denver', diff=0),
 Row(Publisher='3H Productions, Inc.', Title="Haley's Cleaning Hints", diff=0),
 Row(Publisher='A H M Publications', Title='The Subjection of Women (Crofts Classics)', diff=0),
 Row(Publisher='A. & C. Black', Title="Writing for television in the 70's", diff=0),
 Row(Publisher='A. & M. Muchnik', Title='Comuna Verdad (Analectas)', diff=0),
 Row(Publisher='A. Deutsch', Title='Moon Tiger', diff=0),
 Row(Publisher='A. Deutsch', Title='Russia in revolution, 1900-1930', diff=1),
 Row(Publisher='A. Deutsch', Title='Apple of my eye', diff=1),
 Row(Publisher='A. Deutsch', Title='Pictures from the water trade: An Englishman in Japan', diff=1),


In [4]:
join.filter("Rating > 0").groupBy("UserId").agg(F.count("ISBN").alias("BookCount")).filter("BookCount >= 1").count()

77805

In [10]:
data.groupBy("UserId").agg(F.count("ISBN").alias("BookCount")).filter("BookCount >= 1").count()

105283

In [None]:
data.filter("Rating > 0").groupBy("UserId").agg(F.count("ISBN").alias("BookCount")).orderBy("BookCount", ascending=False).collect()

[Row(UserId=463, BookCount=1),
 Row(UserId=15790, BookCount=1),
 Row(UserId=29228, BookCount=1),
 Row(UserId=211126, BookCount=1),
 Row(UserId=227501, BookCount=1),
 Row(UserId=237504, BookCount=1),
 Row(UserId=218049, BookCount=1),
 Row(UserId=82672, BookCount=1),
 Row(UserId=90461, BookCount=1),
 Row(UserId=45307, BookCount=1),
 Row(UserId=37251, BookCount=1),
 Row(UserId=36355, BookCount=1),
 Row(UserId=199976, BookCount=1),
 Row(UserId=203504, BookCount=1),
 Row(UserId=252036, BookCount=1),
 Row(UserId=265539, BookCount=1),
 Row(UserId=246944, BookCount=1),
 Row(UserId=253457, BookCount=1),
 Row(UserId=245857, BookCount=1),
 Row(UserId=243392, BookCount=1),
 Row(UserId=276652, BookCount=1),
 Row(UserId=144414, BookCount=1),
 Row(UserId=33855, BookCount=1),
 Row(UserId=14876, BookCount=1),
 Row(UserId=212128, BookCount=1),
 Row(UserId=230550, BookCount=1),
 Row(UserId=210285, BookCount=1),
 Row(UserId=209992, BookCount=1),
 Row(UserId=78766, BookCount=1),
 Row(UserId=99899, BookCoun