In [1]:
import requests
from bs4 import BeautifulSoup
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace, when
import time

def scrape_data():
    url = "http://books.toscrape.com/catalogue/category/books_1/index.html"
    response = requests.get(url)
    if response.status_code != 200:
        raise Exception(f"Failed to retrieve data from {url}")
    soup = BeautifulSoup(response.text, "html.parser")
    data = []
    books = soup.find_all("article", class_="product_pod")
    for book in books:
        title = book.h3.a["title"]
        price = book.find("p", class_="price_color").text.strip().replace("Â£", "").strip()  # Cleaning price
        rating = book.p["class"][1]
        data.append((title, price, rating))
    return data

def run_spark_job():
    spark = SparkSession.builder \
        .appName("Spark Web Scraper - Books") \
        .config("spark.driver.extraJavaOptions", "--add-opens java.base/java.nio=ALL-UNNAMED") \
        .config("spark.driver.bindAddress", "127.0.0.1") \
        .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    scraped_data = scrape_data()
    columns = ["title", "price", "rating"]
    spark_df = spark.createDataFrame(scraped_data, schema=columns)
    
    spark_df = spark_df.withColumn("price", regexp_replace(col("price"), "[^0-9.]", "").cast("float"))
    
    spark_df = spark_df.withColumn("price_category",
                                   when(col("price") < 20, "cheap")
                                   .when((col("price") >= 20) & (col("price") < 35), "moderate")
                                   .otherwise("expensive"))
    
    sorted_df = spark_df.orderBy(col("price").asc())
    sorted_df.show(truncate=False)

    timestamp = time.strftime("%Y%m%d-%H%M%S")
    output_path = f"../data/output_books_{timestamp}.csv"
    sorted_df.write.csv(output_path, header=True)

    spark.stop()

if __name__ == "__main__":
    run_spark_job()


24/10/09 18:20:48 WARN Utils: Your hostname, Brackens-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.0.0.168 instead (on interface en0)
24/10/09 18:20:48 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/10/09 18:20:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


+----------------------------------------------------------------------------------------------+-----+------+--------------+
|title                                                                                         |price|rating|price_category|
+----------------------------------------------------------------------------------------------+-----+------+--------------+
|Starving Hearts (Triangular Trade Trilogy, #1)                                                |13.99|Two   |cheap         |
|Set Me Free                                                                                   |17.46|Five  |cheap         |
|The Coming Woman: A Novel Based on the Life of the Infamous Feminist, Victoria Woodhull       |17.93|Three |cheap         |
|Shakespeare's Sonnets                                                                         |20.66|Four  |moderate      |
|The Boys in the Boat: Nine Americans and Their Epic Quest for Gold at the 1936 Berlin Olympics|22.6 |Four  |moderate      |
