In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Goodreads Book Recommendation").getOrCreate()

25/06/19 23:32:43 WARN Utils: Your hostname, Zwanes-MacBook.local resolves to a loopback address: 127.0.0.1; using 192.168.110.72 instead (on interface en0)
25/06/19 23:32:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/19 23:32:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
df = spark.read.csv("data/raw.csv", header=True, inferSchema=True)

                                                                                

In [3]:
from pyspark.sql import functions as F

import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

In [4]:
df.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Authors: string (nullable = true)
 |-- ISBN: string (nullable = true)
 |-- Rating: string (nullable = true)
 |-- PublishYear: string (nullable = true)
 |-- PublishMonth: string (nullable = true)
 |-- PublishDay: string (nullable = true)
 |-- Publisher: string (nullable = true)
 |-- RatingDist5: string (nullable = true)
 |-- RatingDist4: string (nullable = true)
 |-- RatingDist3: string (nullable = true)
 |-- RatingDist2: string (nullable = true)
 |-- RatingDist1: string (nullable = true)
 |-- RatingDistTotal: string (nullable = true)
 |-- CountsOfReview: string (nullable = true)
 |-- Language: string (nullable = true)
 |-- PagesNumber: string (nullable = true)
 |-- Description: string (nullable = true)



### Missing Values

In [5]:
total_rows = df.count()

missing_counts = df.select([
    F.sum(F.when(F.col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in df.columns
]).collect()[0].asDict()

summary_rows = [
    (col, cnt, cnt / total_rows * 100)
    for col, cnt in missing_counts.items()
    if cnt > 0
]
missing_summary_df = spark.createDataFrame(
    summary_rows,
    schema=["Column", "Missing Values", "Percentage"]
).orderBy(F.desc("Missing Values"))

print("Missing Values Summary:")
missing_summary_df.show(truncate=False)

                                                                                

Missing Values Summary:




+---------------+--------------+------------------+
|Column         |Missing Values|Percentage        |
+---------------+--------------+------------------+
|Language       |268678        |94.47319934176289 |
|Description    |99710         |35.060268076906844|
|Publisher      |5219          |1.835117230903388 |
|PagesNumber    |4150          |1.459232900603384 |
|CountsOfReview |4020          |1.413521990464001 |
|RatingDistTotal|3997          |1.405434675593187 |
|RatingDist1    |3973          |1.3969957383366856|
|RatingDist2    |3945          |1.3871503115374337|
|RatingDist3    |3918          |1.3776565071238696|
|ISBN           |3914          |1.3762500175811192|
|RatingDist4    |3865          |1.3590205206824288|
|RatingDist5    |3815          |1.3414394013980506|
|PublishDay     |3677          |1.292915512173167 |
|PublishMonth   |3583          |1.259863007918536 |
|PublishYear    |3485          |1.225404014121155 |
|Rating         |3339          |1.174067145810771 |
|Authors    

                                                                                

### Dropping unrelevant columns

In [6]:
cols_to_drop = [
    "ISBN", "PublishMonth", "PublishDay",
    "RatingDist5", "RatingDist4", "RatingDist3",
    "RatingDist2", "RatingDist1",
    "CountsOfReview", "language"
]
df_clean = df.drop(*cols_to_drop)

df_clean = df_clean.withColumn(
    "RatingDistTotal",
    F.regexp_replace(F.col("RatingDistTotal"), "^total:", "").cast("int")
)

df_clean = df_clean.dropna(
    how="any",
    subset=["Description", "PagesNumber", "Publisher", "Rating", "RatingDistTotal"]
)

df_clean.printSchema()
df_clean.show(5, truncate=False)

root
 |-- Id: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Authors: string (nullable = true)
 |-- Rating: string (nullable = true)
 |-- PublishYear: string (nullable = true)
 |-- Publisher: string (nullable = true)
 |-- RatingDistTotal: integer (nullable = true)
 |-- PagesNumber: string (nullable = true)
 |-- Description: string (nullable = true)

+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------+------+-----------+----------------------------------------------------------+---------------+-----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [7]:
null_counts = df_clean.select([
    F.sum(
        F.when(F.col(c).isNull() | F.isnan(c), 1).otherwise(0)
    ).alias(c)
    for c in df_clean.columns
])

null_counts_row = null_counts.collect()[0].asDict()
remaining_nulls = {col: cnt for col, cnt in null_counts_row.items() if cnt > 0}

if remaining_nulls:
    print("Still missing values in:")
    for col, cnt in remaining_nulls.items():
        print(f"  • {col}: {cnt} null(s)")
else:
    print("✅ No more missing values in any column.")

null_counts.show(truncate=False)


                                                                                

✅ No more missing values in any column.


[Stage 13:>                                                         (0 + 8) / 8]

+---+----+-------+------+-----------+---------+---------------+-----------+-----------+
|Id |Name|Authors|Rating|PublishYear|Publisher|RatingDistTotal|PagesNumber|Description|
+---+----+-------+------+-----------+---------+---------------+-----------+-----------+
|0  |0   |0      |0     |0          |0        |0              |0          |0          |
+---+----+-------+------+-----------+---------+---------------+-----------+-----------+



                                                                                

### Creating a new CSV for the cleaned data

In [8]:
pandas_df = df_clean.toPandas()
pandas_df.to_csv("data/goodreads.csv", index=False)

                                                                                