In [None]:
import os
import json
import time
from bs4 import BeautifulSoup as bs
from bs4.element import Tag
from typing import Tuple

In [None]:
html_output_path_root = "data/scraped/camel/search"  # product HTML files are stored here
json_output_path = "data/scraped/camel/parsed-products.json"  # product JSON files are stored here

In [None]:
def get_file_list(path):
	file_list = []
	for root, dirs, files in os.walk(path):
		for file in files:
			file_list.append(os.path.join(root, file))
	return file_list

In [None]:
files = get_file_list(html_output_path_root)
print(f"Total files: {len(files)}")
sample_filepath = files[0]
print(f"Sample file: {sample_filepath}")

In [None]:
def get_bs_obj(file):
	with open(file, "r") as f:
		html = f.read()
		bs_obj = bs(html, "html.parser")
	return bs_obj

In [None]:
bs_example = get_bs_obj(sample_filepath)
print(bs_example.prettify()) #[:500])

In [None]:
def get_page_type(page: bs) -> str:
	"""
	Identify the type of page based on the structure of the HTML.
	"""
	# Selenium locators:
	# LOCATOR_SEARCH_RESULTS = (By.CSS_SELECTOR,
	#                         "#content > div.grid-x.grid-margin-x.search_results")
	# LOCATOR_INCLUDE_NOT_IN_STOCK = (By.CSS_SELECTOR,
	#                               "#content > form:nth-child(9) > input.button")
	# LOCATOR_HIGH_VOLUME_OF_SEARCHES = (By.CSS_SELECTOR, ".alert-callout-border")
	# LOCATOR_SINGLE_PRODUCT = (
	# 		By.CSS_SELECTOR,
	# 		"div.grid-x:nth-child(12) > div:nth-child(1) > h3:nth-child(1) > a:nth-child(1)"
	# )  # when a product search returns a single product page directly
	if page.select_one("#content > div.grid-x.grid-margin-x.search_results"):
		return "search_results"
	elif page.select_one("#content > form:nth-child(9) > input.button"):
		return "not_in_stock"
	elif page.select_one(".alert-callout-border"):
		return "high_volume_of_searches"
	elif page.select_one(
	    "div.grid-x:nth-child(12) > div:nth-child(1) > h3:nth-child(1) > a:nth-child(1)"
	):
		return "single_product"
	elif page.select_one("#signup_head"): # <h2 class="notopmargin" id="signup_head"> means we have the main page
		return "main_page"
	else:
		return "unknown"

In [None]:
page_type = get_page_type(bs_example)
print(f"Page type: {page_type}")

In [None]:
def parse_search_results(page: bs) -> list:
	"""
	Parse the search results page and return a list of product URLs.
	"""
	products = []  # list of objects
	product_elements = page.select(
	    "#content > div.grid-x.grid-margin-x.search_results > div")
	for product_element in product_elements:
		#	link element is <a> tag with class of "camels" and attribute x-camel-place="Search - Title"
		link_elements = product_element.select("a.camels")
		link_element = None  # type: Tag
		for link_element in link_elements:
			# print(link_element.text)
			if "x-camel-place" in link_element.attrs and link_element.attrs[
			    "x-camel-place"] == "Search - Title":
				link_element = link_element
				break
		if not link_element:
			continue
		url = str(link_element["href"])
		title = link_element.text.strip()
		# asin is from link_element x-camel-asin attribute
		asin = str(link_element.attrs["x-camel-asin"]).strip()
		product = {
			"title": title,
			"asin": asin,
			# "url": url
		}
		products.append(product)
	return products


In [None]:
products_list = parse_search_results(bs_example)
print(f"Total products: {len(products_list)}")
print(f"Example product: {json.dumps(products_list[0], indent=2)}")
print("Other products:")
for product in products_list:
	print(f"{product['asin']}: {product['title']}")

In [None]:
def parse_single_product(page: bs) -> dict:
	'''
	Parse a single product page and return a dictionary of product attributes.
	'''
	product = {}
	# title - <title> tag text
	title_element = page.select_one("title")
	if title_element:
		product["title"] = title_element.text.split("|")[0].strip()
	else:
		product["title"] = None
	# asin - x-camel-asin attribute of element with id="buybtn_price_amazon"
	asin_element = page.select_one("#buybtn_price_amazon")
	if asin_element:
		product["asin"] = asin_element.attrs["x-camel-asin"].strip()
	else:
		product["asin"] = None
	return product
	

In [None]:
def process_pages(files: list) -> dict:
	"""
	Processes pages from the list of HTML files and returns a dictionary of parsed results.
	"""
	results = {}
	time_start = time.time()
	print(f"Processing {len(files)} files...")
	for i, file in enumerate(files):
		print(f"\rProcessing file {i+1} / {len(files)}", end="")
		bs_obj = get_bs_obj(file)
		page_type = get_page_type(bs_obj)
		if page_type == "search_results":
			products_list = parse_search_results(bs_obj)
			results[file] = {
				"page_type": page_type,
				"from_file": file,
				"products": products_list
			}
		elif page_type == "single_product":
			product = parse_single_product(bs_obj)
			results[file] = {
				"page_type": page_type,
				"from_file": file,
				"product": product
			}
		elif page_type == "not_in_stock":
			results[file] = {
				"page_type": page_type,
				"from_file": file
			}
		elif page_type == "high_volume_of_searches":
			results[file] = {
				"page_type": page_type,
				"from_file": file
			}
		elif page_type == "main_page":
			results[file] = {
				"page_type": page_type,
				"from_file": file
			}
		else:
			results[file] = {
				"page_type": page_type,
				"from_file": file
			}
	time_end = time.time()
	print(f"\nProcessing took {time_end - time_start} seconds")
	return results

def get_processed_pages_statistics(processed_pages: dict) -> dict:
	"""
	Returns a dictionary of statistics about the processed pages.
	"""
	statistics = {
		"total_pages": len(processed_pages),
		"search_results": 0,
		"single_product": 0,
		"not_in_stock": 0,
		"high_volume_of_searches": 0,
		"main_page": 0,
		"unknown": 0
	}
	for page in processed_pages:
		page_type = processed_pages[page]["page_type"]
		statistics[page_type] += 1
	return statistics



In [None]:
processed_pages = process_pages(files)

In [None]:
processed_pages_statistics = get_processed_pages_statistics(processed_pages)
print(f"Processed pages statistics: {json.dumps(processed_pages_statistics, indent=2)}")

In [None]:
def get_products(processed_pages: dict) -> Tuple[dict, dict]:
	products = {}
	duplicate_asins = {}
	for page in processed_pages.values():
		if page["page_type"] == "search_results":
			for product in page["products"]:
				# product["title"] = product["title"][:-12] # remove (ASIN) from title # TODO: make this idempotent
				product["page_type"] = page["page_type"]
				product["from_file"] = page["from_file"]
				if product["asin"] not in products:
					products[product["asin"]] = product
				else:
					if product["asin"] not in duplicate_asins:
						duplicate_asins[product["asin"]] = 0
					duplicate_asins[product["asin"]] += 1
		elif page["page_type"] == "single_product":
			product = page["product"]
			product["page_type"] = page["page_type"]
			product["from_file"] = page["from_file"]
			if product["asin"] not in products:
				products[product["asin"]] = product
			else:
				if product["asin"] not in duplicate_asins:
					duplicate_asins[product["asin"]] = 0
				duplicate_asins[product["asin"]] += 1
	return products, duplicate_asins

products, duplicate_asins = get_products(processed_pages)
duplicates_instances = len(duplicate_asins)
duplicates_count = sum(duplicate_asins.values())
print(f"Total unique products: {len(products)}")
print(f"Total duplicate ASINs: {duplicates_instances}")
print(f"Total duplicate ASINs count: {duplicates_count}")
print(f"Product example: {json.dumps(list(products.values())[0], indent=2)}")

In [None]:
# Write to JSON file
with open(json_output_path, "w") as f:
	json.dump(products, f, indent=2)
print(f"Written to '{json_output_path}'")