In [0]:
from pyspark.sql import functions as F

# Data Cleaning and preparation to build pipeline.

In [0]:

book_data_df = spark.read.csv("/Volumes/workspace/default/my_volume/books_data.csv", header=True, inferSchema=True)
book_data_df.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("books_raw")


In [0]:
%sql
select * from books_raw limit 10

In [0]:
book_data_df_cleaned = book_data_df.drop("image", "infoLink", "previewLink", "ratingsCount", "publisher", "publishedDate")
book_data_df_cleaned.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("books_clean")

In [0]:
%sql
select * from books_clean limit 10

In [0]:
ratings_df = spark.read.csv("/Volumes/workspace/default/my_volume/Books_rating.csv", header=True, inferSchema=True)
ratings_df.write.mode("overwrite").saveAsTable("ratings_raw")

In [0]:
%sql
select * from ratings_raw;

#### There are many malformed rows that need cleaning in each dataset. Quotation marks in the Title or description caused elements to shift. These will need to be isolated and dealt with. There are 3 million instances in the ratings dataset which will be transformed into transactions and there are 212404 instances in the books dataset. 

##### Ideally, we would replace the missing data in the books dataset because the book titles reference actual books. 

In [0]:

import requests

def get_book_info(title):
    url = f"https://www.googleapis.com/books/v1/volumes?q=intitle:{title}"
    resp = requests.get(url).json()
    if "items" in resp:
        volume_info = resp["items"][0]["volumeInfo"]
        return volume_info.get("title"), volume_info.get("authors"), volume_info.get("categories")
    return None, None, None, None

title, authors, categories = get_book_info("Batman 423-425 ""The Diplomat's Son"" Complete Story")
print(
f"""Google Books API
    Title: {title},
    Authors: {authors},
    Categories: {categories}""")

In [0]:
from rapidfuzz import fuzz

def enrich_book_open_library(partial_title, similarity_threshold=80):
    url = f"https://openlibrary.org/search.json?title={partial_title}"
    headers = {"User-Agent": "BookDataEnricher/1.0 (your_email@example.com)"}
    
    try:
        response = requests.get(url, headers=headers, timeout=10)
        
        response.raise_for_status()
        data = response.json()
        print(data)
    except Exception as e:
        print(f"Error fetching data for '{partial_title}': {e}")
        return None, None, None

    if "docs" not in data or not data["docs"]:
        return None, None, None

    best_match = None
    best_score = 0

    for doc in data["docs"]:
        full_title = doc.get("title", "")
        score = fuzz.token_set_ratio(partial_title.lower(), full_title.lower())
        
        if score > best_score:
            best_score = score
            best_match = doc

    if best_score >= similarity_threshold and best_match:
        print(best_match)
        authors = best_match.get("author_name", [])
        subjects = best_match.get("subject", [])
        return best_match.get("title"), authors, subjects
    else:
        return None, None, None
    
title, authors, categories = enrich_book_open_library("Batman 423-425 ""The Diplomat's Son"" Complete Story")
print(
f"""Open Library API
    Title: {title},
    Authors: {authors},
    Categories: {categories}""")

##### I tried both Google Books API and Open Library API to query book titles. Google Books API (GB) is very good at finding matches based on partial titles. This dataset lumps titles and subtitles together in the same Title field and GB is very good at finding these as well. Open Library is not. The problem is that GB has a daily limit of 1000 requests. Which means this isn't a good option. I will proceed with cleaning the dataset for use.

In [0]:
%sql
--- Get rid of null Titles right away in both tables -- 
create or replace table books_clean as
select *
from books_clean
where Title is not null;

In [0]:
%sql
create or replace table ratings_clean as
select *
from ratings_raw
where Title is not null;

#### We'll append ratings.Id to books.Id as they appear to be 1 to 1 with each other. 

In [0]:
%sql
SELECT
  COUNT(DISTINCT Id) AS distinct_ids,
  COUNT(DISTINCT Title) AS distinct_titles
FROM ratings_clean;

#### I guess not :(

In [0]:
%sql
select Id, Title, count(Id) as id_count
from ratings_clean
group by Title, Id
sort by Title;

In [0]:
%sql
--- Titles with multiple Ids ---
SELECT distinct Title, Id, count(Id) as id_count
FROM ratings_clean
WHERE Title IN (
  SELECT Title
  FROM ratings_clean
  GROUP BY Title
  HAVING COUNT(DISTINCT Id) > 1
)
group by Title, Id
sort BY Title;


In [0]:
%sql
select Id,Title
from ratings_clean
where Title = '"The descendants of Capt. Thomas Carter of ""Barford""';

In [0]:
%sql
---Ids with multiple Titles ---
SELECT distinct Id, Title
from ratings_clean
where Id in (
  select Id
  from ratings_clean
  group by Id
  HAVING count(distinct Title) > 1 
)
group by Id, Title
sort by Id;


#### It looks like in all cases Titles have multiple Ids whereas Ids only are associated with a single Title. Which is great! There does not appear to be any rhyme or reason to the Ids. I'll just replace all Ids for a Title with the one of the Ids associated with it. Doesn't really matter which one.

In [0]:
ratings_df = spark.read.table("ratings_clean")
ratings_df.show()

In [0]:


# Retrieve first Id by Title
first_id_by_title = ratings_df.groupBy("Title").agg(F.first("Id").alias("FirstId"))

# Join back first Ids with dataframe
ratings_df_enriched = ratings_df.join(first_id_by_title, on="Title", how="left")

# Replace Id with FirstId and drop extra col
df_result = ratings_df_enriched.withColumn("Id", F.col("FirstId")).drop("FirstId")



In [0]:
# Save new as table
df_result.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("ratings_clean")

In [0]:
%sql
--- Titles with multiple Ids ---
SELECT distinct Title, Id, count(Id) as id_count
FROM ratings_clean
WHERE Title IN (
  SELECT Title
  FROM ratings_clean
  GROUP BY Title
  HAVING COUNT(DISTINCT Id) > 1
)
group by Title, Id
sort BY Title;


#### Looks good! Now, to join with books and drop Id col from ratings_clean.

In [0]:
book_df = spark.read.table("books_clean")

# Join back first Ids with dataframe
books_with_id = book_df.join(
    df_result.select("Title", "Id").dropDuplicates(["Title"]), on="Title", how="left"
)

display(books_with_id)

In [0]:
books_with_id.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("books_clean")

In [0]:
ratings_df = spark.read.table('ratings_clean')
ratings_df = ratings_df.drop('Title')
ratings_df.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("ratings_clean")

#### I'll work on the Price column next. This I will add to the books table and drop from ratings.

In [0]:
%sql
--- Verify each Id only has a single Price associated with it ---
select Id, COUNT(DISTINCT Price) as price_count
from ratings_clean
group by Id
having count(distinct price) > 1

In [0]:
%sql
select distinct Id, Title, Price
from ratings_clean
where Id in (
  select Id
  from ratings_clean
  group by Id
  having count(distinct Price) > 1
) and Price is not null
sort by Title;

In [0]:
%sql
--- count nulls ---
select count(*)
from ratings_clean
where Price is null

#### Lots of null prices or the row is malformed and has text. I will drop the malformed rows and corresponding rows in the books table. For duplicates, I will again select the first Price associated with a Title and replicate it for all rows. Null Prices for a title will be replaced from a random value from the range of distinct Prices.

In [0]:
ratings_df = spark.read.table("ratings_clean")

#### For Ids that have multiple Prices, I am going to select the highest Price and replace all Prices corresponding to the same Id with that value

In [0]:
# Selecting all Ids with a numerical Price
digits = ratings_df.select("Id", "Price").where("Price rlike '^[0-9]+(\.[0-9]+)?$'")
print(f"Total rows with numerical Prices: {digits.count()}")
display(digits.sort("Price", ascending = True))


In [0]:
# Need distinct prices to replace null Prices
distinct_prices = digits.select("Price").distinct()
print(f"Total Distinct Prices: {distinct_prices.count()}")
display(distinct_prices.sort("Price", ascending = True))

In [0]:
max_prices = digits.groupBy("Id") \
    .agg(F.max("Price").alias("max_price"))

display(max_prices)

In [0]:
# Replace Prices in ratings with their highest price and replace all non-numeric values with null
ratings_digits_cleaned = ratings_df.drop("Price") \
    .join(max_prices, on="Id", how="left").withColumnRenamed("max_price", "Price")

print(ratings_digits_cleaned.count())

display(ratings_digits_cleaned.limit(10))

In [0]:
null_prices = ratings_digits_cleaned.select("Id").where("Price is null").dropDuplicates(["Id"])
print(null_prices.count())
display(null_prices.limit(10))

# HERE I have a frame of Ids with null prices after replacing non numeric prices with null. Need to append values from distinct prices.

In [0]:
from pyspark.sql import Window as W

prices_indexed = distinct_prices.withColumn(
    "idx",
    F.row_number().over(W.orderBy("Price"))
)

N = prices_indexed.count()

seed = 69
id_price_map = (
    null_prices
    .withColumn("rand_idx", (F.floor(F.rand(seed) * F.lit(N)) + 1).cast("int"))
    .join(F.broadcast(prices_indexed), F.col("rand_idx") == F.col("idx"), "left")
    .select("Id", F.col("Price").alias("Price_new"))
)

display(id_price_map)


In [0]:
# Replace all null prices in ratings frame with random prices from distinct_prices
ratings_cleaned = (
    ratings_digits_cleaned.alias("rc")
    .join(F.broadcast(id_price_map).alias("map"), on="Id", how="left")
    .withColumn("Price", F.coalesce(F.col("rc.Price"), F.col("map.Price_new")))
    .drop("Price_new")
)

display(ratings_cleaned)

# Here. I have a ratings frame with cleaned Prices. Now i need to add prices to books and drop it from ratings. I'll drop unnecessary columns from ratings and turn it into transactions.

# Preparing the cleaned datasets for the dashboard.

We'll check the distribution of the the number of ratings per book of the cleaned dataset.