In [0]:
##wite ur config here

In [0]:
dbutils.fs.ls("abfss://booksbronze@amazonbooks0912.dfs.core.windows.net/")

[FileInfo(path='abfss://booksbronze@amazonbooks0912.dfs.core.windows.net/amazon_books (1).csv', name='amazon_books (1).csv', size=52627, modificationTime=1752388496000)]

In [0]:
# Load CSV file into a Spark DataFrame
file_path = "abfss://booksbronze@amazonbooks0912.dfs.core.windows.net/amazon_books (1).csv"

# Method 1: Using spark.read.csv (recommended for Databricks)
df = spark.read.csv(
    file_path,
    header=True,        # First row contains column names
    inferSchema=True,   # Automatically infer data types
    escape='"'          # Handle escaped quotes in data
)

# Display basic info about the DataFrame
print("DataFrame Schema:")
df.printSchema()

print(f"\nTotal rows: {df.count()}")
print(f"Total columns: {len(df.columns)}")

# Show first few rows
print("\nFirst 5 rows:")
df.show(5)

DataFrame Schema:
root
 |-- author: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- image_url: string (nullable = true)
 |-- price: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- reviews_count: string (nullable = true)
 |-- title: string (nullable = true)


Total rows: 305
Total columns: 7

First 5 rows:
+----------------+---------+--------------------+-----+------+-------------+--------------------+
|          author|    genre|           image_url|price|rating|reviews_count|               title|
+----------------+---------+--------------------+-----+------+-------------+--------------------+
|   Joseph Nguyen|Self Help|https://m.media-a...|  163|   4.5|       11,964|Don't Believe Eve...|
|     Cal Newport|Self Help|https://m.media-a...|  232|   4.5|       30,704|DEEP WORK: RULES ...|
|Thibaut Meurisse|Self Help|https://m.media-a...|  181|   4.4|       14,851|Dopamine Detox: A...|
|   Brianna Wiest|Self Help|https://m.media-a...|  259|   4.4|    

In [0]:
# Book Dataset Cleaning Script for Databricks
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import re

In [0]:
# 1. Clean author names

books_df = df.withColumn(
    "author",
    when(col("author").rlike("^by$|^by "), regexp_replace(col("author"), "^by\\s*", ""))
    .when(col("author").contains(","), 
          regexp_replace(col("author"), ",$", ""))  # Remove trailing comma
    .otherwise(col("author"))
).withColumn(
    "author",
    when(col("author") == "by", lit("Unknown"))
    .when(col("author").isNull() | (col("author") == ""), lit("Unknown"))
    .otherwise(trim(col("author")))
)

In [0]:
#cleaning genres
books_df = books_df.withColumn(
    "genre",
    when(col("genre").isNull() | (col("genre") == ""), lit("Unknown"))
    .otherwise(initcap(trim(col("genre"))))
)

In [0]:
#cleaning price
books_df = books_df.withColumn(
    "price_numeric",
    when(col("price").isNull() | (col("price") == "") | (col("price") == "0"), lit(0.0))
    .otherwise(
        regexp_replace(col("price"), "[^0-9.]", "").cast("double")
    )
).drop("price").withColumnRenamed("price_numeric", "price")

In [0]:
books_df = books_df.withColumn(
    "reviews_count_numeric",
    when(col("reviews_count").isNull() | (col("reviews_count") == ""), lit(0))
    .otherwise(
        regexp_replace(col("reviews_count"), "[^0-9]", "").cast("integer")
    )
).drop("reviews_count").withColumnRenamed("reviews_count_numeric", "reviews_count")

In [0]:
# 5. Handle rating column - ensure it's within valid range

books_df = books_df.withColumn(
    "rating",
    when(col("rating").isNull() | (col("rating") < 0) | (col("rating") > 5), lit(0.0))
    .otherwise(col("rating"))
)

In [0]:
# 6. Clean title column

books_df = books_df.withColumn(
    "title",
    when(col("title").isNull() | (col("title") == ""), lit("Unknown Title"))
    .otherwise(
        # Remove HTML entities and clean up encoding issues
        regexp_replace(
            regexp_replace(col("title"), "â€™", "'"),
            "Ç€", ""
        )
    )
).withColumn(
    "title",
    trim(col("title"))
)

In [0]:
print("8. Removing duplicates...")
books_df = books_df.dropDuplicates()

8. Removing duplicates...


In [0]:

# 9. Create additional derived columns for analysis

books_df = books_df.withColumn(
    "price_category",
    when(col("price") == 0, "Free")
    .when(col("price") <= 150, "Low")
    .when(col("price") <= 300, "Medium")
    .otherwise("High")
).withColumn(
    "rating_category",
    when(col("rating") == 0, "No Rating")
    .when(col("rating") >= 4.5, "Excellent")
    .when(col("rating") >= 4.0, "Good")
    .when(col("rating") >= 3.5, "Average")
    .otherwise("Below Average")
).withColumn(
    "popularity_score",
    when(col("reviews_count") == 0, 0)
    .otherwise(col("rating") * log10(col("reviews_count") + 1))
)


In [0]:
# Check for null values
print("\nNull value counts after cleaning:")
for column in books_df.columns:
    null_count = books_df.filter(col(column).isNull()).count()
    print(f"{column}: {null_count}")


Null value counts after cleaning:
author: 0
genre: 0
image_url: 0
rating: 0
title: 0
price: 0
reviews_count: 0
price_category: 0
rating_category: 0
popularity_score: 0


In [0]:
#save to silver layer here


Saving to: abfss://bookssilver@amazonbooks0912.dfs.core.windows.net/cleaned_books_data
✅ Successfully saved cleaned data as Parquet format
Saving CSV to: abfss://bookssilver@amazonbooks0912.dfs.core.windows.net/cleaned_books_data_csv
✅ Successfully saved cleaned data as CSV format
