# Chrono24 Watch Scraper with Selenium

This notebook contains a complete Selenium-based web scraper for extracting watch data from Chrono24.com. The scraper bypasses anti-bot protection by using real Chrome browser.

In [24]:
import time
import json
import logging
import pandas as pd
from typing import Dict, List, Optional
from dataclasses import dataclass

from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from webdriver_manager.chrome import ChromeDriverManager

from bs4 import BeautifulSoup

print("All libraries imported successfully!")

All libraries imported successfully!


In [25]:
from dataclasses import dataclass
from typing import Dict

@dataclass
class WatchItem:
    watch_id: str = ""
    watch_title: str = ""
    watch_price: str = ""
    watch_details: Dict = None
    
    def __post_init__(self):
        if self.watch_details is None:
            self.watch_details = {}
    
    def to_dict(self) -> Dict:
        return {
            "watch_id": self.watch_id,
            "watch_title": self.watch_title,
            "watch_price": self.watch_price,
            "watch_details": self.watch_details
        }
    
    def to_flat_dict(self) -> Dict:
        flat_dict = {
            "watch_id": self.watch_id,
            "watch_title": self.watch_title,
            "watch_price": self.watch_price
        }
        
        if self.watch_details:
            for section, details in self.watch_details.items():
                if isinstance(details, dict):
                    for key, value in details.items():
                        column_name = f"{section}_{key}".replace(" ", "_").replace("/", "_")
                        flat_dict[column_name] = value
        
        return flat_dict


In [26]:
class SeleniumWebDriver:
    def __init__(self, headless: bool = True):
        self.driver = None
        self.headless = headless
        self.logger = logging.getLogger(__name__)
        
    def setup_driver(self):
        chrome_options = Options()
        
        chrome_options.add_argument("--no-sandbox")
        chrome_options.add_argument("--disable-dev-shm-usage")
        chrome_options.add_argument("--disable-blink-features=AutomationControlled")
        chrome_options.add_argument("--disable-extensions")
        chrome_options.add_argument("--disable-plugins")
        chrome_options.add_argument("--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36")
        
        if self.headless:
            chrome_options.add_argument("--headless")
        
        chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
        chrome_options.add_experimental_option('useAutomationExtension', False)
        
        chrome_options.add_experimental_option("prefs", {
            "profile.default_content_setting_values.notifications": 2
        })

        service = Service(ChromeDriverManager().install())
        self.driver = webdriver.Chrome(service=service, options=chrome_options)
        
        self.driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
        
        return self.driver
    
    def close_driver(self):
        if self.driver:
            self.driver.quit()

web_driver = SeleniumWebDriver(headless=True)

In [27]:
class Chrono24Scraper:
    def __init__(self, web_driver_manager: SeleniumWebDriver):
        self.web_driver_manager = web_driver_manager
        self.driver = None
        self.base_url = "https://www.chrono24.com"
        self.scraped_items = []
        self.logger = logging.getLogger(__name__)
        self.pages_scraped = 0
        self.max_pages = None
        
    def start_scraping(self, start_url: str, max_pages: int = None):
        self.max_pages = max_pages
        self.pages_scraped = 0
        self.driver = self.web_driver_manager.setup_driver()
        print(f"Starting scraper with max_pages: {max_pages if max_pages else 'unlimited'}")
        
        self.scrape_pages_by_url_construction(start_url)
        
    def scrape_pages_by_url_construction(self, base_url: str):
        page_num = 1
        
        while True:
            if self.max_pages and page_num > self.max_pages:
                print(f"Reached maximum pages limit: {self.max_pages}")
                break
                
            if page_num == 1:
                current_url = base_url
            else:
                if "index.htm" in base_url:
                    current_url = base_url.replace("index.htm", f"index-{page_num}.htm")
                else:
                    current_url = f"{base_url.rstrip('/')}/index-{page_num}.htm"
            
            print(f"Loading page {page_num}: {current_url}")
            
            if not self.scrape_single_page(current_url, page_num):
                print(f"Page {page_num} not found or empty, stopping pagination")
                break
                
            page_num += 1
            time.sleep(3)
    
    def scrape_single_page(self, url: str, page_num: int):
        try:
            self.driver.get(url)
            time.sleep(3)
            
            try:
                WebDriverWait(self.driver, 10).until(
                    EC.presence_of_element_located((By.CSS_SELECTOR, ".article-item-container"))
                )
            except TimeoutException:
                print(f"No watch listings found on page {page_num}")
                return False
            
            soup = BeautifulSoup(self.driver.page_source, 'html.parser')
            
            watch_links = soup.select('.article-item-container > a')
            
            if not watch_links:
                print(f"No watch links found on page {page_num}")
                return False
                
            self.pages_scraped += 1
            print(f"Found {len(watch_links)} watch listings on page {page_num}")
            
            for i, link in enumerate(watch_links, 1):
                href = link.get('href')
                if href:
                    watch_url = self.base_url + href
                    print(f"Processing watch {i}/{len(watch_links)} on page {page_num}")
                    self.scrape_watch_detail(watch_url)
                    time.sleep(1)
            
            print(f"Completed page {page_num}. Total watches so far: {len(self.scraped_items)}")
            return True
            
        except Exception as e:
            print(f"Error scraping page {page_num}: {e}")
            return False
    
    def scrape_watch_detail(self, url: str):
        try:
            self.driver.get(url)
            time.sleep(2)
            
            soup = BeautifulSoup(self.driver.page_source, 'html.parser')
            
            watch_item = WatchItem()
            
            watch_id_element = soup.select_one(".wt-share-offer")
            if watch_id_element:
                watch_item.watch_id = watch_id_element.get('data-watch-id', '')
            
            title_element = soup.select_one("h1.h3")
            if title_element:
                watch_item.watch_title = title_element.get_text(strip=True)
            
            price_element = soup.select_one(".js-price-shipping-country")
            if price_element:
                watch_item.watch_price = price_element.get_text(strip=True)
            
            watch_item.watch_details = self.extract_watch_details(soup)
            
            self.scraped_items.append(watch_item)
            
        except Exception as e:
            print(f"Error scraping watch detail: {e}")
    
    def extract_watch_details(self, soup: BeautifulSoup) -> Dict:
        watch_data = {}
        section_name = None
        
        try:
            table = soup.select_one('table')
            if not table:
                return watch_data
            
            rows = table.select('tr')
            for row in rows:
                section_header = row.select_one('td[colspan] h3')
                if section_header:
                    section_name = section_header.get_text(strip=True)
                    watch_data[section_name] = {}
                elif section_name:
                    cells = row.select('td')
                    if len(cells) >= 2:
                        key_element = cells[0].select_one('strong')
                        value_element = cells[1]
                        
                        if key_element and value_element:
                            key = key_element.get_text(strip=True)
                            value = value_element.get_text(strip=True)
                            if key and value:
                                watch_data[section_name][key] = value
                                
        except Exception as e:
            print(f"Error extracting watch details: {e}")
            
        return watch_data
    
    def save_to_json(self, filename: str = "chrono24_watches.json"):
        try:
            data = [item.to_dict() for item in self.scraped_items]
            with open(filename, 'w', encoding='utf-8') as f:
                json.dump(data, f, indent=2, ensure_ascii=False)
            return filename
        except Exception as e:
            print(f"Error saving to JSON: {e}")
            return None
    
    def save_to_csv(self, filename: str = "chrono24_watches.csv"):
        try:
            flat_data = [item.to_flat_dict() for item in self.scraped_items]
            df = pd.DataFrame(flat_data)
            df.to_csv(filename, index=False, encoding='utf-8')
            return filename
        except Exception as e:
            print(f"Error saving to CSV: {e}")
            return None
    
    def get_progress_info(self):
        return {
            "pages_scraped": self.pages_scraped,
            "max_pages": self.max_pages,
            "total_watches": len(self.scraped_items),
            "avg_watches_per_page": len(self.scraped_items) / self.pages_scraped if self.pages_scraped > 0 else 0
        }
    
    def close(self):
        self.web_driver_manager.close_driver()

scraper = Chrono24Scraper(web_driver)
print("Chrono24 scraper started")

Chrono24 scraper started


## Configuration and Usage

Now you can run the scraper with different configurations. The scraper will:

1. **Load the target page** using Selenium Chrome
2. **Extract watch listings** from the category page
3. **Follow each watch link** to get detailed specifications
4. **Parse structured data** from the specifications table
5. **Handle pagination** automatically
6. **Save results** to CSV and JSON file



In [28]:
start_url = "https://www.chrono24.com/rolex/index.htm"
max_pages = 3

print(f"Starting scraper for {max_pages} pages")
print(f"Target URL: {start_url}")

try:
    scraper.start_scraping(start_url, max_pages=max_pages)
    
    progress = scraper.get_progress_info()
    print(f"\nScraping Summary:")
    print(f"Pages scraped: {progress['pages_scraped']}")
    print(f"Total watches: {progress['total_watches']}")
    print(f"Average watches per page: {progress['avg_watches_per_page']:.1f}")
    
    json_file = scraper.save_to_json("scraped_watches.json")
    csv_file = scraper.save_to_csv("scraped_watches.csv")
    
    if json_file:
        print(f"JSON saved to: {json_file}")
    if csv_file:
        print(f"CSV saved to: {csv_file}")
        
except Exception as e:
    print(f"Scraping failed: {e}")
finally:
    scraper.close()

Starting scraper for 3 pages
Target URL: https://www.chrono24.com/rolex/index.htm
Starting scraper with max_pages: 3
Loading page 1: https://www.chrono24.com/rolex/index.htm
Starting scraper with max_pages: 3
Loading page 1: https://www.chrono24.com/rolex/index.htm
Found 60 watch listings on page 1
Processing watch 1/60 on page 1
Found 60 watch listings on page 1
Processing watch 1/60 on page 1
Processing watch 2/60 on page 1
Processing watch 2/60 on page 1
Processing watch 3/60 on page 1
Processing watch 3/60 on page 1
Processing watch 4/60 on page 1
Processing watch 4/60 on page 1
Processing watch 5/60 on page 1
Processing watch 5/60 on page 1
Processing watch 6/60 on page 1
Processing watch 6/60 on page 1
Processing watch 7/60 on page 1
Processing watch 7/60 on page 1
Processing watch 8/60 on page 1
Processing watch 8/60 on page 1
Processing watch 9/60 on page 1
Processing watch 9/60 on page 1
Processing watch 10/60 on page 1
Processing watch 10/60 on page 1
Processing watch 11/60 o

## Data Analysis and Visualization

After scraping, you can analyze the collected data:

In [29]:
import pandas as pd
import matplotlib.pyplot as plt
import re

def analyze_scraped_data(scraped_items):
    if not scraped_items:
        print("No data to analyze")
        return
    
    data = []
    for item in scraped_items:
        row = {
            'watch_id': item.watch_id,
            'title': item.watch_title,
            'price': item.watch_price,
        }
        
        if item.watch_details:
            for section, details in item.watch_details.items():
                if isinstance(details, dict):
                    for key, value in details.items():
                        row[f"{section}_{key}"] = value
        
        data.append(row)
    
    df = pd.DataFrame(data)
    
    print(f"Total watches: {len(df)}")
    print(f"Unique watch IDs: {df['watch_id'].nunique()}")
    
    prices = []
    for price in df['price'].dropna():
        numbers = re.findall(r'[\d,]+', str(price).replace(',', ''))
        if numbers:
            try:
                prices.append(int(numbers[0]))
            except:
                pass
    
    if prices:
        print(f"Price statistics:")
        print(f"  Min price: ${min(prices):,}")
        print(f"  Max price: ${max(prices):,}")
        print(f"  Average price: ${sum(prices)/len(prices):,.0f}")
        print(f"  Median price: ${sorted(prices)[len(prices)//2]:,}")
    
    print("Most common case materials:")
    case_materials = df.filter(regex='.*[Cc]ase material.*').stack().value_counts().head()
    for material, count in case_materials.items():
        print(f"  {material}: {count}")
    
    print("Most common movements:")
    movements = df.filter(regex='.*[Mm]ovement.*').stack().value_counts().head()
    for movement, count in movements.items():
        print(f"  {movement}: {count}")
    
    return df

if hasattr(scraper, 'scraped_items') and scraper.scraped_items:
    df = analyze_scraped_data(scraper.scraped_items)
    print("Analysis completed")
else:
    print("No scraped data available. Run the scraper first.")

Total watches: 180
Unique watch IDs: 180
Price statistics:
  Min price: $3,123
  Max price: $132,186
  Average price: $21,511
  Median price: $14,295
Most common case materials:
  Steel: 190
  Gold/Steel: 80
  Rose gold: 26
  White gold: 24
  Yellow gold: 24
Most common movements:
  Automatic: 358
  3235: 39
  3135: 22
  3255: 11
  4130: 11
Analysis completed


## Utility Functions

Additional helper functions for data manipulation and analysis:

In [30]:
def load_scraped_data(filename: str):
    try:
        with open(filename, 'r', encoding='utf-8') as f:
            data = json.load(f)
        print(f"Loaded {len(data)} watches from {filename}")
        return data
    except FileNotFoundError:
        print(f"File {filename} not found")
        return []
    except json.JSONDecodeError:
        print(f"Invalid JSON in {filename}")
        return []

def merge_scraped_data(*filenames):
    all_data = []
    for filename in filenames:
        data = load_scraped_data(filename)
        all_data.extend(data)
    
    print(f"Merged {len(all_data)} total watches from {len(filenames)} files")
    return all_data

def filter_watches_by_price(data, min_price=None, max_price=None):
    filtered = []
    for watch in data:
        price_str = watch.get('watch_price', '')
        numbers = re.findall(r'[\d,]+', price_str.replace(',', ''))
        if numbers:
            try:
                price = int(numbers[0])
                if (min_price is None or price >= min_price) and \
                   (max_price is None or price <= max_price):
                    filtered.append(watch)
            except:
                pass
    
    print(f"Filtered to {len(filtered)} watches within price range")
    return filtered

def search_watches(data, search_term):
    search_term = search_term.lower()
    results = []
    
    for watch in data:
        if search_term in watch.get('watch_title', '').lower():
            results.append(watch)
            continue
        
        details = watch.get('watch_details', {})
        for section, values in details.items():
            if isinstance(values, dict):
                for key, value in values.items():
                    if search_term in value.lower():
                        results.append(watch)
                        break
                if watch in results:
                    break
    
    print(f"Found {len(results)} watches matching '{search_term}'")
    return results

def convert_json_to_csv(json_filename, csv_filename):
    data = load_scraped_data(json_filename)
    if data:
        flat_data = []
        for watch in data:
            flat_dict = {
                "watch_id": watch.get("watch_id", ""),
                "watch_title": watch.get("watch_title", ""),
                "watch_price": watch.get("watch_price", "")
            }
            
            watch_details = watch.get("watch_details", {})
            if watch_details:
                for section, details in watch_details.items():
                    if isinstance(details, dict):
                        for key, value in details.items():
                            column_name = f"{section}_{key}".replace(" ", "_").replace("/", "_")
                            flat_dict[column_name] = value
            
            flat_data.append(flat_dict)
        
        df = pd.DataFrame(flat_data)
        df.to_csv(csv_filename, index=False, encoding='utf-8')
        print(f"Converted {json_filename} to {csv_filename}")
        return csv_filename
    return None
