In [1]:
from selenium import webdriver
from selenium.webdriver.common.by import By
import time
import pandas as pd
import os
import random

### Parameter Setting

In [2]:
# Query source and output result path
student_id = "M11207321"
query_path = f"./queries/{student_id}_queries.txt"
results_path = "./results"

# Web scraping target URL
search_url = "https://www.tw.coupang.com/search?q="

# Scraping parameter settings
short_time_sleep = 1
medium_time_sleep = 3
long_time_sleep = 5

### Helpful Funtions

In [3]:
# Read queries from file
def read_queries(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        lines = file.readlines()
        lines = [line.strip() for line in lines]
    return lines

# Check if the webpage is accessible
def check_access(driver):
    try:
        text = driver.find_element(By.XPATH, '/html/body/h1')
        if text.text == 'Access Denied':
            return False
        else: 
            return True
    except:
        return True

# Scroll the webpage to the bottom
def scroll_to_bottom(driver, pause_time=3):
    # Get the current scroll height
    last_height = driver.execute_script("return document.body.scrollHeight")
    
    # Scroll to the bottom of the page
    driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
    
    # Pause to allow any new content to load
    time.sleep(pause_time)
    
    # Get the new scroll height after scrolling
    new_height = driver.execute_script("return document.body.scrollHeight")
    
    # If the new height is greater than the last height, new content has loaded
    if not (new_height > last_height):
        driver.execute_script("window.scrollBy(0, -100);")
        return False
    else :
        return True

# Get all items from pages   
def get_all_items(driver):
    try:
        items = driver.find_elements(By.XPATH, '/html/body/div[1]/div[2]/main/div/div/div[3]/div/div/a')
        return items
    except:
        print("No items found.")

# Extract all item informations to a dataframe   
def extract_item_info(items):
    print("Extracting item information...")
    data = []
    for i, item in enumerate(items):
        try:
            item_name = item.find_element(By.XPATH, 'div[2]').text
            item_url = item.get_attribute('href')
            # price is in one of two possible XPaths
            try:
                item_price = item.find_element(By.XPATH, 'div[3]/div[2]/span/span').text # on sale.
            except:
                try:
                    item_price = item.find_element(By.XPATH, 'div[3]/div[1]/span/span').text # not on sale.
                except:
                    item_price = "null"
            
            data.append({
                'product_name': item_name,
                'product_price': item_price,
                'product_url': item_url
            })
        except:
            print(f"Error extracting item {i}.")

    df = pd.DataFrame(data)
    return df

### Set up Chrome options

In [4]:
# Open the webpage
driver = webdriver.Chrome()
time.sleep(short_time_sleep)

### Start web scraping.
#### (During scraping, you may open other windows, but do not close or minimize the Chrome window that is performing the scraping.)
#### (Make sure the screen remains on while the scraper is running)

In [5]:
# Main scraping process
queries = read_queries(query_path)
for query in queries:
    # Get all csv files in the results folder, if exists pass the query
    csv_files = [f for f in os.listdir(results_path) if f.endswith('.csv')]
    search_string = query
    all_contain_string = any(search_string in file_name for file_name in csv_files)
    if all_contain_string:
        print(f"Results for {query} have already been scraped. Skipping...\n")
        continue

    # Search for the query
    driver.get(search_url + query)
    time.sleep(medium_time_sleep)
    status = check_access(driver)
    if status:
        print(f"Start scraping {query}...")
    else:
        print(f"Some error occurred while scraping {query}.")
        continue
    while status:
        status = check_access(driver)
        if status:  # Only proceed to scroll if check_access is True
            status = scroll_to_bottom(driver, medium_time_sleep + random.random()) # add time noise

    # Process the items
    items = get_all_items(driver)
    items_df = extract_item_info(items)

    # Save the results to a CSV file
    file_path = os.path.join(results_path, f"{student_id}_{query}.csv")
    items_df.to_csv(file_path, index=False, encoding='utf-8-sig')
    print(f"Results for {query} have been saved to {file_path}")
    time.sleep(long_time_sleep + random.random() * 10)
    print("Sleeping for a while...")
    print("-"*80)

# Close the browser
driver.quit()

Start scraping 筆電...
Extracting item information...
Results for 筆電 have been saved to ./results\M11207321_筆電.csv
Sleeping for a while...
--------------------------------------------------------------------------------
Start scraping 衣服...
Extracting item information...
Results for 衣服 have been saved to ./results\M11207321_衣服.csv
Sleeping for a while...
--------------------------------------------------------------------------------
Start scraping 餅乾...
Extracting item information...
Results for 餅乾 have been saved to ./results\M11207321_餅乾.csv
Sleeping for a while...
--------------------------------------------------------------------------------
Start scraping 洗衣精...
Extracting item information...
Results for 洗衣精 have been saved to ./results\M11207321_洗衣精.csv
Sleeping for a while...
--------------------------------------------------------------------------------
Start scraping 衛生紙...
Extracting item information...
Results for 衛生紙 have been saved to ./results\M11207321_衛生紙.csv
Sleeping 