In [1]:
!pip install beautifulsoup4 requests pandas lxml fake_useragent
!pip install mysql-connector-python



In [9]:
import requests # sends http requests
from bs4 import BeautifulSoup # used  for parsing html data
import pandas as pd # handles data frames and data manipulation
import time # used for sleep functions to delay scraping requests
import random # generates random delays between requests
import re # used for pattern matching
import csv # writes data to .csv file
import mysql.connector # used to connect to mysql database

# TASK 1
# AMAZON URL FOR DATA ENGINEERING BOOKS SEARCH RESULTS
URL = "https://www.amazon.com/s?k=data+engineering+books"

# USER-AGENT LIST TO AVOID DETECTION BY AMAZON
USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.83 Safari/537.36",
    "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36"
]

# HEADERS TO MIMIC A REAL BROWSER REQUEST FOR SCRAPING
HEADERS = {
    "User-Agent": random.choice(USER_AGENTS),  # randomly choose a user-agent to avoid detection
    "Accept-Language": "en-US,en;q=0.5"  # define acceptable languages in the request
}

# FUNCTION TO GET PUBLICATION DATE FROM BOOK'S PRODUCT PAGE
def get_publication_date_from_product_page(book_url):
    """Fetch the publication date from a book's product page."""
    try:
        # SEND REQUEST TO BOOK'S PRODUCT PAGE
        product_response = requests.get(book_url, headers=HEADERS)
        
        # CHECK IF THE PAGE LOADS SUCCESSFULLY
        if product_response.status_code == 200:
            product_soup = BeautifulSoup(product_response.text, "html.parser")  # parses html response
            # POSSIBLE LOCATIONS TO LOOK FOR PUBLICATION DATE
            possible_locations = [
                "#detailBullets_feature_div", 
                "#productDetailsTable", 
                "#prodDetails"
            ]
            for location in possible_locations:  # checks all locations
                details = product_soup.select_one(location)  # gets the element containing details
                if details:
                    detail_text = details.get_text(strip=True)  # cleans up the text
                    # SEARCH FOR A FULL DATE PATTERN
                    date_match = re.search(r"(January|February|March|April|May|June|July|August|September|October|November|December) \d{1,2}, \d{4}", detail_text)
                    if date_match:
                        return date_match.group()  # returns the found date

            # IF NO FULL DATE FOUND, SEARCH FOR A YEAR IN THE TEXT
            year_match = re.search(r"(19|20)\d{2}", product_soup.text)
            if year_match:
                return year_match.group()  # returns the year
    except Exception as e:
        print(f"Error fetching product page: {e}")  # prints error if exception error occurs
    return None  # returns none if no date is found

# FETCH MAIN SEARCH RESULTS PAGE FROM AMAZON
response = requests.get(URL, headers=HEADERS)

# CHECK IF PAGE RETRIEVAL WAS SUCCESSFUL
if response.status_code == 200:
    soup = BeautifulSoup(response.text, "html.parser")  # parses html of main page
    titles, authors, pub_dates, ratings, prices = [], [], [], [], []  # lists to store scraped data
    books = soup.find_all("div", {"data-component-type": "s-search-result"})  # finds all book entries

    # LOOP THROUGH FIRST 25 BOOKS
    for book in books[:25]:
        title_tag = book.find("h2", class_="a-size-base-plus a-spacing-none a-color-base a-text-normal")  # finds title
        title = f'"{title_tag.text.strip()}"' if title_tag else None  # encloses title in quotes

        # EXTRACT AUTHOR INFORMATION
        author_tag = book.find("div", class_="a-row a-size-base a-color-secondary")
        if author_tag:
            author_links = author_tag.find_all("a")
            author = ", ".join([a.text.strip() for a in author_links]) if author_links else None  # combines author names
        else:
            author = None

        # EXTRACT PUBLICATION DATE
        pub_date = None
        date_spans = book.find_all("span", class_="a-size-base a-color-secondary")  # finds all date spans
        for span in date_spans:
            span_text = span.get_text(strip=True)
            # SEARCH FOR A FULL DATE FORMAT
            date_match = re.search(r"(January|February|March|April|May|June|July|August|September|October|November|December) \d{1,2}, \d{4}", span_text)
            if date_match:
                pub_date = date_match.group()  # sets publication date
                break
        
        # IF NO FULL DATE, SEARCH FOR A YEAR ONLY
        if pub_date is None:
            year_match = re.search(r"(19|20)\d{2}", book.text)
            if year_match:
                pub_date = year_match.group()  # sets publication year

        # EXTRACT RATING
        rating_tag = book.find("span", class_="a-icon-alt")
        rating = rating_tag.text.strip().split()[0] if rating_tag else None  # extracts rating value

        # EXTRACT PRICE
        price_tag = book.find("span", class_="a-price-whole")
        if not price_tag:
            price_tag = book.find("span", class_="a-offscreen")
        
        if price_tag:
            price = price_tag.text.strip()
            price = f"{float(price):.2f}" if price else None # displays 2 decimal places
        else:
            price = None

        # EXTRACT BOOK URL FOR FURTHER DETAILS
        book_url_tag = book.find("a", class_="a-link-normal s-no-outline")
        book_url = "https://www.amazon.com" + book_url_tag["href"] if book_url_tag else None  # creates full url

        # FETCH PUBLICATION DATE FROM PRODUCT PAGE IF MISSING
        if pub_date is None and book_url:
            pub_date = get_publication_date_from_product_page(book_url)

        # APPEND EXTRACTED DATA TO LISTS
        titles.append(title)
        authors.append(author)
        pub_dates.append(pub_date)
        ratings.append(rating)
        prices.append(price)

        # RESPECT AMAZON SCRAPING GUIDELINES BY ADDING RANDOM DELAY
        time.sleep(random.uniform(1, 3))

    # CREATE PANDAS DATAFRAME WITH SCRAPED DATA
    df = pd.DataFrame({
        "Title": titles,
        "Author": authors,
        "Publication Date": pub_dates,
        "Rating": ratings,
        "Price": prices
    })

    
    # SAVE DATA TO CSV WITH PROPER QUOTING
    df.to_csv("amazon_data_engineering_books.csv", index=False, quoting=csv.QUOTE_ALL)

    # DISPLAY THE SCRAPED DATA
    print("\nScraped Data:\n")
    print(df.to_string(index=False))
    print("\n「✔」Data has been successfully saved to 'amazon_data_engineering_books.csv'.")
else:
    print("  ⓘ Failed to retrieve webpage. Amazon may have blocked the request.")  # error handling failure in retrieval





# TASK 2
# ENSURE DATABASE CONNECTION AND CREATION
def create_database():
    """Ensures that the MySQL database exists."""
    # CREATE CONNECTION TO MYSQL SERVER
    conn = mysql.connector.connect(
        host="localhost",
        user="root",
        password="353txRQ8"
    )
    # CREATE CURSOR OBJECT TO INTERACT WITH THE DATABASE
    cursor = conn.cursor()
    # CREATE DATABASE IF IT DOESN'T EXIST
    cursor.execute("CREATE DATABASE IF NOT EXISTS data_engineering_books")
    # COMMIT THE TRANSACTION
    conn.commit()
    # CLOSE CURSOR AND CONNECTION
    cursor.close()
    conn.close()

# ENSURE TABLE CREATION
def create_table():
    """Ensures that the 'books' table exists with the correct schema."""
    # CREATE CONNECTION TO MYSQL SERVER WITH DATABASE SELECTION
    conn = mysql.connector.connect(
        host="localhost",
        user="root",
        password="353txRQ8",
        database="data_engineering_books"
    )
    # CREATE CURSOR OBJECT TO INTERACT WITH THE DATABASE
    cursor = conn.cursor()
    
    # CREATE TABLE IF IT DOESN'T EXIST
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS books (
            id INT AUTO_INCREMENT PRIMARY KEY,  # PRIMARY KEY AND AUTO INCREMENT FOR ID COLUMN
            title VARCHAR(255),  # BOOK TITLE COLUMN
            author VARCHAR(255),  # BOOK AUTHOR COLUMN
            publication_date VARCHAR(50),  # PUBLICATION DATE COLUMN
            rating VARCHAR(10),  # BOOK RATING COLUMN
            price VARCHAR(50)  # BOOK PRICE COLUMN
        )
    """)
    
    # COMMIT THE TRANSACTION TO SAVE CHANGES
    conn.commit()
    # CLOSE CURSOR AND CONNECTION
    cursor.close()
    conn.close()

# LOAD DATA FROM THE CSV FILE AND HANDLE NaN VALUES
def load_data():
    """Loads scraped data from CSV, handles missing values, and inserts into MySQL."""
    # READ DATA FROM CSV FILE INTO A PANDAS DATAFRAME
    df = pd.read_csv("amazon_data_engineering_books.csv")

    # FILL NaN VALUES WITH DEFAULT TEXT FOR TEXT FIELDS AND "NOT AVAILABLE" FOR NUMERIC FIELDS
    df.fillna({
        'Title': 'Unknown',  # fills empty values in 'Title' with 'Unknown'
        'Author': 'Unknown', 
        'Publication Date': 'Not Available',
        'Rating': '0', 
        'Price': 'Not Available'  
    }, inplace=True)

    conn = None
    cursor = None

    try:
        # CREATE CONNECTION TO MYSQL DATABASE
        conn = mysql.connector.connect(
            host="localhost",
            user="root",
            password="353txRQ8",
            database="data_engineering_books"
        )
        # CREATE CURSOR OBJECT TO INTERACT WITH THE DATABASE
        cursor = conn.cursor()

        # INSERT DATA INTO THE 'books' TABLE
        for _, row in df.iterrows():
            cursor.execute("""
                INSERT INTO books (title, author, publication_date, rating, price)
                VALUES (%s, %s, %s, %s, %s)
            """, (row['Title'], row['Author'], row['Publication Date'], row['Rating'], row['Price']))
        
        # COMMIT THE TRANSACTION TO SAVE CHANGES
        conn.commit()
        print("「✔」Data has been successfully inserted into MySQL!")

    except mysql.connector.Error as db_error:
        # PRINT ERROR MESSAGE IN CASE OF A DATABASE ERROR
        print(f"Database error: {db_error}")

    finally:
        # CLOSE CURSOR AND CONNECTION AFTER USE
        if cursor:
            cursor.close()
        if conn:
            conn.close()

# EXTRACT AND SORT DATA
def query_books():
    """Executes the SQL query to extract and sort books based on rating."""
    
    # CREATE SEPARATE CONNECTION FOR QUERY1
    conn1 = mysql.connector.connect(
        host="localhost",
        user="root",
        password="353txRQ8",
        database="data_engineering_books"
    )
    cursor1 = conn1.cursor()

    # MEASURE SQL QUERY1 EXECUTION TIME - START TIME
    start_time1 = time.time()

    # SELECT TITLE, AUTHOR, AND RATING COLUMNS, ORDERING BY RATING IN DESCENDING ORDER
    query1 = """
        SELECT title, author, rating
        FROM books
        ORDER BY 
            CASE 
                WHEN rating = 'No rating' THEN 0
                ELSE CAST(rating AS DECIMAL)
            END DESC;

    """
    cursor1.execute(query1)
    results1 = cursor1.fetchall()

    # END TIME
    end_time1 = time.time()
    execution_time1 = end_time1 - start_time1  # calculates query execution time
    print(f"\nQuery 1 executed in {execution_time1:.4f} seconds.") # displays execution time

    # CREATE A PANDAS DATAFRAME FROM THE FIRST QUERY RESULT
    df_results1 = pd.DataFrame(results1, columns=["Title", "Author", "Rating"])
    print("\nTOP-RATED DATA ENGINEERING BOOKS:\n")
    print(df_results1)

    # CLOSE CURSOR AND CONNECTION FOR THE FIRST QUERY
    cursor1.close()
    conn1.close()

    
    # CREATE SEPARATE CONNECTION FOR QUERY2
    conn2 = mysql.connector.connect(
        host="localhost",
        user="root",
        password="353txRQ8",
        database="data_engineering_books"
    )
    cursor2 = conn2.cursor()

    # MEASURE SQL QUERY2 EXECUTION TIME - START TIME
    start_time2 = time.time()
    
    # SELECT TITLE, PUBLICATION DATE, AND PRICE COLUMNS, ORDERING BY PUBLICATION DATE IN ASCENDING ORDER
    query2 = """
        SELECT title, publication_date, price
        FROM books
        WHERE price > 0
        ORDER BY price ASC;
    """
    cursor2.execute(query2)
    results2 = cursor2.fetchall()

    # END TIME
    end_time2 = time.time()
    execution_time2 = end_time2 - start_time2  ## calculates query execution time
    print(f"\nQuery 2 executed in {execution_time2:.4f} seconds.") # displays execution time

    # CREATE A PANDAS DATAFRAME FROM THE SECOND QUERY RESULT
    df_results2 = pd.DataFrame(results2, columns=["Title", "Publication Date", "Price"])
    print("\nBOOKS SORTED BY PRICE (Cheapest to Most Expensive):\n")
    print(df_results2)

    # CLOSE CURSOR AND CONNECTION FOR THE SECOND QUERY
    cursor2.close()
    conn2.close()

# EXECUTE FUNCTIONS
create_database()  # creates database if it doesn't exist
create_table()  # creates'books' table if it doesn't exist
load_data()  # loads data into database from .csv file
query_books()  # calls queries and displays results


Scraped Data:

                                                                                                                                                Title                                            Author  Publication Date Rating Price
                                                                                           "Mastering the Data Paradox: Key to Winning in the AI Age"                                        Nitin Seth    April 10, 2024    4.8 17.00
                               "Data Engineering with AWS: Acquire the skills to design and build AWS-based data transformation pipelines like a pro"                                      Gareth Eagar  October 31, 2023    4.3 24.00
"Cracking the Data Engineering Interview: Land your dream job with the help of resume-building tips, over 100 mock questions, and a unique portfolio"                                    Kedeisha Bryan  November 7, 2023    4.0 26.00
                                                            