In [8]:
import json
import re
from pathlib import Path
from bs4 import BeautifulSoup
from pydantic import BaseModel
from urllib.parse import urljoin
from typing import Optional

class ScrapedCatalogueItem(BaseModel):
    name: str
    price: float
    image_url: str
    product_url: str
    retailer: str

non_sgd_prices = []
error_log = []

def log_error(retailer: str, message: str, html_snippet: Optional[str] = None):
    error_entry = {
        'retailer': retailer.upper(),
        'message': message,
        'html_snippet': html_snippet[:500] + '...' if html_snippet else None
    }
    error_log.append(error_entry)

def parse_price(price_str: str) -> Optional[float]:
    """Parse price only if it contains SGD currency markers"""
    try:
        if not re.search(r'S\$|SGD', price_str, re.IGNORECASE):
            non_sgd_prices.append(price_str)
            return None
        
        # Extract numeric value
        match = re.search(r'[\d,\.]+', price_str.replace(',', ''))
        if not match:
            return None
        
        return float(match.group())
    except Exception as e:
        log_error('GLOBAL', f'Price parsing error: {str(e)}', price_str)
        return None

# ------------------------- ON.com Scraper -------------------------
def scrape_on(html_path: Path) -> list:
    products = []
    retailer = 'on'
    
    try:
        with open(html_path, 'r', encoding='utf-8') as f:
            soup = BeautifulSoup(f, 'html.parser')
        
        product_cards = soup.select('article._productCard_1nudf_59')
        if not product_cards:
            log_error(retailer, 'No product cards found')
            return products
            
        for idx, card in enumerate(product_cards):
            card_html = str(card)[:200] + '...'
            try:
                # Name
                name_elem = card.select_one('._title_hkzgy_73')
                if not name_elem:
                    log_error(retailer, f'Card {idx} missing name', card_html)
                    continue
                name = name_elem.get_text(strip=True)
                
                # Price
                price_elem = card.select_one('._price_hkzgy_99')
                if not price_elem:
                    log_error(retailer, f'Card {idx} missing price', card_html)
                    continue
                price = parse_price(price_elem.get_text(strip=True))
                if price is None:
                    continue
                
                # Image
                img_elem = card.select_one('._productMedia_1nudf_76 img')
                image_url = img_elem.get('src', '') if img_elem else ''
                
                # Product URL
                link_elem = card.find('a', class_='_imageLink_1nudf_92')
                if not link_elem or not link_elem.get('href'):
                    log_error(retailer, f'Card {idx} missing product link', card_html)
                    continue
                product_url = urljoin("https://www.on.com", link_elem['href'])
                
                products.append(ScrapedCatalogueItem(
                    name=name,
                    price=price,
                    image_url=image_url,
                    product_url=product_url,
                    retailer=retailer
                ).dict())
                
            except Exception as e:
                log_error(retailer, f'Card {idx} error: {str(e)}', card_html)
                
    except Exception as e:
        log_error(retailer, f'File processing failed: {str(e)}')
        
    return products

# ------------------------- Dr. Martens Scraper -------------------------
def scrape_drmartens(html_path: Path) -> list:
    products = []
    retailer = 'drmartens'
    
    try:
        with open(html_path, 'r', encoding='utf-8') as f:
            soup = BeautifulSoup(f, 'html.parser')
        
        product_cards = soup.select('div.grid-product')
        if not product_cards:
            log_error(retailer, 'No product cards found')
            return products
            
        for idx, card in enumerate(product_cards):
            card_html = str(card)[:200] + '...'
            try:
                # Name
                name_elem = card.select_one('.grid-product__title')
                if not name_elem:
                    log_error(retailer, f'Card {idx} missing name', card_html)
                    continue
                name = name_elem.get_text(strip=True)
                
                # Price - Updated selector
                price_container = card.select_one('.grid-product__price')
                if not price_container:
                    log_error(retailer, f'Card {idx} missing price container', card_html)
                    continue
                
                # Check multiple price locations
                price_elem = (price_container.select_one('.grid-product__price--current-sale') or 
                             price_container.select_one('.grid-product__price--current') or 
                             price_container.select_one('.grid-product__price--original'))
                
                if not price_elem:
                    log_error(retailer, f'Card {idx} missing price element', card_html)
                    continue
                
                price_text = price_elem.get_text(strip=True)
                price = parse_price(price_text)
                if price is None:
                    continue
                
                # Image
                img_elem = card.select_one('.grid-product__image-wrap img')
                image_url = img_elem.get('src', '') if img_elem else ''
                
                # Product URL
                link_elem = card.find('a', class_='grid-item__link')
                if not link_elem or not link_elem.get('href'):
                    log_error(retailer, f'Card {idx} missing product link', card_html)
                    continue
                product_url = urljoin("https://drmartens.com.sg", link_elem['href'])
                
                products.append(ScrapedCatalogueItem(
                    name=name,
                    price=price,
                    image_url=image_url,
                    product_url=product_url,
                    retailer=retailer
                ).dict())
                
            except Exception as e:
                log_error(retailer, f'Card {idx} error: {str(e)}', card_html)
                
    except Exception as e:
        log_error(retailer, f'File processing failed: {str(e)}')
        
    return products

# ------------------------- Birkenstock Scraper -------------------------
def scrape_birkenstock(html_path: Path) -> list:
    products = []
    retailer = 'birkenstock'
    
    try:
        with open(html_path, 'r', encoding='utf-8') as f:
            soup = BeautifulSoup(f, 'html.parser')
        
        product_cards = soup.select('li.xlt-producttile')
        if not product_cards:
            log_error(retailer, 'No product cards found')
            return products
            
        for idx, card in enumerate(product_cards):
            card_html = str(card)[:200] + '...'
            try:
                # Name components
                model_elem = card.select_one('.product-modelname')
                short_elem = card.select_one('.product-shortname')
                color_elem = card.select_one('.product-colorname')
                
                if not all([model_elem, short_elem, color_elem]):
                    log_error(retailer, f'Card {idx} missing name components', card_html)
                    continue
                
                name = ' '.join([
                    model_elem.get_text(strip=True),
                    short_elem.get_text(strip=True),
                    color_elem.get_text(strip=True)
                ])
                
                # Price
                price_elem = card.select_one('.price-standard')
                if not price_elem:
                    log_error(retailer, f'Card {idx} missing price', card_html)
                    continue
                price = parse_price(price_elem.get_text(strip=True))
                if price is None:
                    continue
                
                # Image
                img_elem = card.select_one('.standard-tileimage')
                image_url = img_elem.get('src', '') if img_elem else ''
                
                # Product URL
                link_elem = card.find('a', class_='product-tile')
                if not link_elem or not link_elem.get('href'):
                    log_error(retailer, f'Card {idx} missing product link', card_html)
                    continue
                product_url = urljoin("https://www.birkenstock.com", link_elem['href'])
                
                products.append(ScrapedCatalogueItem(
                    name=name,
                    price=price,
                    image_url=image_url,
                    product_url=product_url,
                    retailer=retailer
                ).dict())
                
            except Exception as e:
                log_error(retailer, f'Card {idx} error: {str(e)}', card_html)
                
    except Exception as e:
        log_error(retailer, f'File processing failed: {str(e)}')
        
    return products

# ------------------------- Zara Scraper -------------------------
def scrape_zara(html_path: Path) -> list:
    products = []
    retailer = 'zara'
    
    try:
        with open(html_path, 'r', encoding='utf-8') as f:
            soup = BeautifulSoup(f, 'html.parser')
        
        product_cards = soup.select('li.product-grid-product')
        if not product_cards:
            log_error(retailer, 'No product cards found')
            return products
            
        for idx, card in enumerate(product_cards):
            card_html = str(card)[:200] + '...'
            try:
                # Name
                name_elem = card.select_one('a.product-link h2')
                if not name_elem:
                    log_error(retailer, f'Card {idx} missing name', card_html)
                    continue
                name = name_elem.get_text(strip=True)
                
                # Price
                price_elem = card.select_one('.price-current__amount')
                if not price_elem:
                    log_error(retailer, f'Card {idx} missing price', card_html)
                    continue
                price = parse_price(price_elem.get_text(strip=True))
                if price is None:
                    continue
                
                # Image
                img_elem = card.select_one('.media-image__image')
                image_url = img_elem.get('src', '') if img_elem else ''
                
                # Product URL
                link_elem = card.find('a', class_='product-link')
                if not link_elem or not link_elem.get('href'):
                    log_error(retailer, f'Card {idx} missing product link', card_html)
                    continue
                product_url = urljoin("https://www.zara.com/sg/en/", link_elem['href'])
                
                products.append(ScrapedCatalogueItem(
                    name=name,
                    price=price,
                    image_url=image_url,
                    product_url=product_url,
                    retailer=retailer
                ).dict())
                
            except Exception as e:
                log_error(retailer, f'Card {idx} error: {str(e)}', card_html)
                
    except Exception as e:
        log_error(retailer, f'File processing failed: {str(e)}')
        
    return products

# ------------------------- ANMAKO Scraper -------------------------
def scrape_anmako(html_path: Path) -> list:
    products = []
    retailer = 'anmako'
    
    try:
        with open(html_path, 'r', encoding='utf-8') as f:
            soup = BeautifulSoup(f, 'html.parser')
        
        product_cards = soup.select('product-item')
        if not product_cards:
            log_error(retailer, 'No product cards found')
            return products
            
        for idx, card in enumerate(product_cards):
            card_html = str(card)[:200] + '...'
            try:
                # Name
                name_elem = card.select_one('.product-item-meta__title')
                if not name_elem:
                    log_error(retailer, f'Card {idx} missing name', card_html)
                    continue
                name = name_elem.get_text(strip=True)
                
                # Price
                price_elem = card.select_one('.price')
                if not price_elem:
                    log_error(retailer, f'Card {idx} missing price', card_html)
                    continue
                price = parse_price(price_elem.get_text(strip=True))
                if price is None:
                    continue
                
                # Image
                img_elem = card.select_one('.product-item__primary-image')
                image_url = img_elem.get('src', '') if img_elem else ''
                
                # Product URL
                link_elem = card.find('a', class_='product-item__aspect-ratio')
                if not link_elem or not link_elem.get('href'):
                    log_error(retailer, f'Card {idx} missing product link', card_html)
                    continue
                product_url = urljoin("https://www.anmako.sg", link_elem['href'])
                
                products.append(ScrapedCatalogueItem(
                    name=name,
                    price=price,
                    image_url=image_url,
                    product_url=product_url,
                    retailer=retailer
                ).dict())
                
            except Exception as e:
                log_error(retailer, f'Card {idx} error: {str(e)}', card_html)
                
    except Exception as e:
        log_error(retailer, f'File processing failed: {str(e)}')
        
    return products

# ------------------------- LXV Supply Scraper -------------------------
def scrape_lxvsupply(html_path: Path) -> list:
    products = []
    retailer = 'lxvsupply'
    
    try:
        with open(html_path, 'r', encoding='utf-8') as f:
            soup = BeautifulSoup(f, 'html.parser')
        
        product_cards = soup.select('li.grid__item')
        if not product_cards:
            log_error(retailer, 'No product cards found')
            return products
            
        for idx, card in enumerate(product_cards):
            card_html = str(card)[:200] + '...'
            try:
                # Name
                name_elem = card.select_one('.card__heading')
                if not name_elem:
                    log_error(retailer, f'Card {idx} missing name', card_html)
                    continue
                name = name_elem.get_text(strip=True)
                
                # Price
                price_elem = (card.select_one('.price-item--sale') or 
                             card.select_one('.price-item--regular'))
                if not price_elem:
                    log_error(retailer, f'Card {idx} missing price', card_html)
                    continue
                price = parse_price(price_elem.get_text(strip=True))
                if price is None:
                    continue
                
                # Image
                img_elem = card.select_one('img.motion-reduce')
                image_url = ''
                if img_elem:
                    if img_elem.get('srcset'):
                        srcset = img_elem['srcset'].split(',')
                        image_url = srcset[0].strip().split()[0]
                    else:
                        image_url = img_elem.get('src', '')
                
                # Product URL
                link_elem = card.select_one('a.full-unstyled-link')
                if not link_elem or not link_elem.get('href'):
                    log_error(retailer, f'Card {idx} missing product link', card_html)
                    continue
                product_url = urljoin("https://lxvsupply.co", link_elem['href'])
                
                products.append(ScrapedCatalogueItem(
                    name=name,
                    price=price,
                    image_url=image_url,
                    product_url=product_url,
                    retailer=retailer
                ).dict())
                
            except Exception as e:
                log_error(retailer, f'Card {idx} error: {str(e)}', card_html)
                
    except Exception as e:
        log_error(retailer, f'File processing failed: {str(e)}')
        
    return products

def main():
    retailers = {
        'on': scrape_on,
        'drmartens': scrape_drmartens,
        'birkenstock': scrape_birkenstock,
        'zara': scrape_zara,
        'anmako': scrape_anmako,
        'lxvsupply': scrape_lxvsupply
    }

    all_products = []
    
    # Process HTML files
    html_files = list(Path('html_files').glob('*.html'))
    print(f"Found {len(html_files)} HTML files to process")
    
    for html_file in html_files:
        file_stem = html_file.stem
        parts = file_stem.split('_')
        if not parts:
            continue
            
        retailer = parts[0]
        if retailer not in retailers:
            continue
            
        print(f"\nProcessing {html_file.name}...")
        try:
            products = retailers[retailer](html_file)
            all_products.extend(products)
            print(f"Found {len(products)} valid products")
        except Exception as e:
            log_error(retailer, f'File processing failed: {str(e)}')
            print(f"Error processing file: {str(e)}")

    # Deduplicate products
    unique_products = {}
    for prod in all_products:
        if prod['image_url'] and prod['image_url'] not in unique_products:
            unique_products[prod['image_url']] = prod
            
    # Save results
    output_file = 'products.json'
    with open(output_file, 'w', encoding='utf-8') as f:
        json.dump(list(unique_products.values()), f, indent=2, ensure_ascii=False)
    print(f"\nSaved {len(unique_products)} products to {output_file}")

    # Save error log
    if error_log:
        error_file = 'scraper_errors.json'
        with open(error_file, 'w', encoding='utf-8') as f:
            json.dump(error_log, f, indent=2, ensure_ascii=False)
        print(f"Saved {len(error_log)} errors to {error_file}")
    else:
        print("\nNo errors encountered")

    # Non-SGD price report
    if non_sgd_prices:
        unique_non_sgd = list(set(non_sgd_prices))
        print(f"\nFound {len(unique_non_sgd)} unique non-SGD prices:")
        for price in unique_non_sgd:
            print(f" - {price}")

if __name__ == '__main__':
    main()

Found 13 HTML files to process

Processing drmartens_page7.html...
Found 20 valid products

Processing drmartens_page6.html...
Found 20 valid products

Processing birkenstock_page1.html...
Found 602 valid products

Processing drmartens_page1.html...
Found 20 valid products

Processing lxvsupply_page1.html...
Found 5 valid products

Processing drmartens_page3.html...
Found 20 valid products

Processing on_page1.html...
Found 96 valid products

Processing drmartens_page2.html...
Found 13 valid products

Processing drmartens_page5.html...
Found 20 valid products

Processing drmartens_page4.html...
Found 20 valid products

Processing zara_page1.html...
Found 3000 valid products

Processing anmako_page1.html...
Found 40 valid products

Processing drmartens_page8.html...
Found 20 valid products

Saved 3742 products to products.json
Saved 12 errors to scraper_errors.json

Found 8 unique non-SGD prices:
 - ₱ 30,900.00
 - ₱ 12,990.00
 - ₱ 9,990.00
 - ₱ 10,290.00
 - ₱ 1,099.00
 - ₱ 6,490.00
 - ₱

In [18]:
from pydantic import BaseModel
import json
from secretstuff.secret import OPENAI_API_KEY, OPENAI_ORG_ID, OPENAI_PROJ_ID
from services.mongodb import catalogue
from services.metadata import get_catalogue_metadata
from openai import OpenAI
from typing import Literal, List, Optional
import random
import sys
import os
import threading
from bson import ObjectId

parent_dir = os.path.abspath(os.path.join(os.getcwd(), os.pardir))
sys.path.append(parent_dir)


client = OpenAI(api_key=OPENAI_API_KEY, organization=OPENAI_ORG_ID, project=OPENAI_PROJ_ID)
bucket_count = get_catalogue_metadata().bucket_count

class AutoLabelledTags(BaseModel):
    category: str
    gender: str
    clothing_type: str
    color: str
    material: str
    other: list[str]


class ScrapedCatalogueItem(BaseModel):
    name: str
    price: float
    image_url: str
    product_url: str
    retailer: str


class CatalogueItem(BaseModel):
    name: str
    category: Literal['Tops', 'Bottoms', 'Shoes', 'Dresses', 'Outerwear', 'Accessories']
    clothing_type: str
    clothing_type_embed: List[float]
    color: str
    color_embed: List[float]
    material: str
    material_embed: List[float]
    other_tags: List[str]
    other_tags_embed: List[List[float]]
    price: float
    image_url: str
    product_url: str
    retailer: str
    gender: Literal['F', 'M', 'U']
    bucket_num: int = 1

def get_openai_tags(name: str, image_url: str) -> AutoLabelledTags:
    user_prompt = f"""Label clothing item with category amongst the categories. these categories are as such. PLEASE FOLLOW THE CASING AS WELL FOR CATEGORY. 'Tops', 'Bottoms', 'Shoes', 'Dresses', 'Outerwear', 'Accessories'.
    Also, label each item with with one of these genders ['M', 'F', 'U'] (btw U stands for Unisex). 
    Then, generate clothing type, color, material tags for this {name}.
    Then, under other_tags, label the item with 10 or so useful adjectives (eg. what occasion you would wear this with, what kind of fit it is). for other_tags, should all be lower case"""
    output = client.beta.chat.completions.parse(model="gpt-4o",
                                                messages=[
                                                    {"role": "user", "content": [
                                                        {"type": "image_url", "image_url": {"url": image_url, "detail": "low"}}]},
                                                    {"role": "user", "content": user_prompt},
                                                ],
                                                response_format=AutoLabelledTags
                                                )
    # Since default n = 1, we'll only always need to first element
    return json.loads(output.choices[0].message.content)


def get_openai_embedding(tags: json):
    clothing_type_embed = client.embeddings.create(
        input=tags['clothing_type'], model="text-embedding-3-large").data[0].embedding
    color_embed = client.embeddings.create(input=tags['color'], model="text-embedding-3-large").data[0].embedding
    material_embed = client.embeddings.create(input=tags['material'], model="text-embedding-3-large").data[0].embedding
    other_embed = []
    for o in tags['other']:
        other_embed.append(client.embeddings.create(input=o, model="text-embedding-3-large").data[0].embedding)
    return (clothing_type_embed, color_embed, material_embed, other_embed)


def add_catalogue_item(name: str, price: float, image_url: str, product_url: str, retailer: str):
    if catalogue.find_one({"image_url": image_url}) is not None:
        print(f"Image URL exist in DB {image_url}")
        return

    tags = get_openai_tags(name, image_url)
    embedding = get_openai_embedding(tags)
    catalogue_item = CatalogueItem(name=name, category=tags['category'], clothing_type=tags['clothing_type'], clothing_type_embed=embedding[0],
                                   color=tags['color'], color_embed=embedding[1], material=tags['material'], material_embed=embedding[2],
                                   other_tags=tags['other'], other_tags_embed=embedding[3],
                                   price=price, image_url=image_url, product_url=product_url, retailer=retailer, gender="U", bucket_num=random.randint(1, bucket_count))
    catalogue.insert_one(dict(catalogue_item))

Error with 100% LINEN BERMUDA SHORTS: E11000 duplicate key error collection: kagame.catalogue index: image_url_1 dup key: { image_url: "https://static.zara.net/assets/public/4260/2fea/d41e4775a4f3/534e390d0075/05070121800-e1/05070121800-e1.jpg?ts=1737112472776&w=418" }, full error: {'index': 0, 'code': 11000, 'errmsg': 'E11000 duplicate key error collection: kagame.catalogue index: image_url_1 dup key: { image_url: "https://static.zara.net/assets/public/4260/2fea/d41e4775a4f3/534e390d0075/05070121800-e1/05070121800-e1.jpg?ts=1737112472776&w=418" }', 'keyPattern': {'image_url': 1}, 'keyValue': {'image_url': 'https://static.zara.net/assets/public/4260/2fea/d41e4775a4f3/534e390d0075/05070121800-e1/05070121800-e1.jpg?ts=1737112472776&w=418'}}
Error with TEXTURED TERRY CARDIGAN: 1 validation error for CatalogueItem
category
  Input should be 'Tops', 'Bottoms', 'Shoes', 'Dresses', 'Outerwear' or 'Accessories' [type=literal_error, input_value='outerwear', input_type=str]
    For further infor

In [None]:
# Load product data from json
with open('products.json', 'r') as file:
    data = json.load(file)

# Remove duplicates based on 'image_url' key
unique_data = {item["image_url"]: item for item in data}.values()
data = list(unique_data)


def insert_data(data):
    for item in data:
        try:
            add_catalogue_item(name=item['name'], price=item['price'],
                               image_url=item['image_url'], product_url=item['product_url'], retailer=item['retailer'])
        except Exception as e:
            print(f"Error with {item['name']}: {e}")


number_of_threads = 20  # Having too many threads will exceed rate limit
for i in range(number_of_threads):
    thread = threading.Thread(target=insert_data, args=[
                              data[(i * len(data) // number_of_threads):((i+1)*len(data) // number_of_threads)]])
    thread.start()

Image URL exist in DB https://drmartens.com.sg/cdn/shop/files/31809001.80.jpg?v=1723673425
Image URL exist in DB https://www.birkenstock.com/dw/image/v2/BBBF_PRD/on/demandware.static/-/Sites-master-catalog/default/dwc3198a0e/128161/128161.jpg?sw=300
Image URL exist in DB https://www.birkenstock.com/dw/image/v2/BBBF_PRD/on/demandware.static/-/Sites-master-catalog/default/dwb451b726/1029134/1029134.jpg?sw=300
Image URL exist in DB https://static.zara.net/assets/public/2479/f56d/6a9147d9a31a/c31989deb391/08435417808-e1/08435417808-e1.jpg?ts=1741771537432&w=418
Image URL exist in DB https://images.ctfassets.net/hnk2vsx53n6l/4lB71t3Z45KwKdZTvVWX7B/e7255e9b37dc1e9c1a12639c3820ae11/7616e38d7c6a0102c6a76f9790cfb279028cac6d.png?fm=webp
Image URL exist in DB https://www.birkenstock.com/dw/image/v2/BBBF_PRD/on/demandware.static/-/Sites-master-catalog/default/dw5b1d4a3e/1026121/1026121.jpg?sw=300
Image URL exist in DB https://drmartens.com.sg/cdn/shop/files/31696357.80.jpg?v=1723673425
Image URL e