In [3]:
import os
import sys
import re
import time
import signal
import logging
import subprocess
from datetime import datetime
from dotenv import load_dotenv
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock

import numpy as np
import pandas as pd
import requests
from bs4 import BeautifulSoup
from supabase import create_client, Client
from tqdm import tqdm

In [6]:
# =====================
# CONFIGURATION
# =====================
VPN_CONNECT_COMMAND = r'cd "C:\Program Files\NordVPN" && nordvpn -c -g "Netherlands"'
VPN_TIMEOUT = 30
REQUEST_TIMEOUT = 10
RETRY_LIMIT = 3
PAGE_LIMIT_AUTOSCOUT = 20
BATCH_SIZE = 500
REFRESH_RATE_DB = 20
MAX_RUNTIME_SECONDS = 12 * 3600  # 12 hours safety limit
MAX_WORKERS = 50  # concurrent threads for fetching
MAX_REQUESTS_PER_MIN = 100  # rate limiter
BASE_URL = "https://www.autoscout24.nl/lst"
POSTCODE_PATTERN = r'^\d{4}[A-Z]{2}$'


# =====================
# GLOBALS
# =====================
stop_requested = False
last_request_times = []
rate_lock = Lock()


# =====================
# GRACEFUL SHUTDOWN
# =====================
def handle_sigint(sig, frame):
    global stop_requested
    logging.warning("Shutdown signal received. Finishing current batch and exiting...")
    stop_requested = True


signal.signal(signal.SIGINT, handle_sigint)


# =====================
# HELPERS
# =====================
def is_valid_format(s, pattern):
    return bool(re.fullmatch(pattern, s))


def rate_limit():
    """Ensure we do not exceed MAX_REQUESTS_PER_MIN."""
    global last_request_times
    with rate_lock:
        now = time.time()
        # Keep only last 60 seconds of requests
        last_request_times = [t for t in last_request_times if now - t < 60]

        if len(last_request_times) >= MAX_REQUESTS_PER_MIN:
            sleep_time = 60 - (now - last_request_times[0])
            logging.info(f"Rate limit reached. Sleeping for {sleep_time:.2f} seconds...")
            time.sleep(sleep_time)
            last_request_times = [t for t in last_request_times if now - t < 60]

        last_request_times.append(time.time())


def safe_request(url, params=None):
    """Perform HTTP request with retry, timeout, rate limiting, and proxy rotation."""
    for attempt in range(RETRY_LIMIT):
        if stop_requested:
            return None
        try:
            rate_limit()
            response = requests.get(url, params=params, timeout=REQUEST_TIMEOUT)
            response.raise_for_status()
            return response.text
        except (requests.exceptions.RequestException, requests.exceptions.Timeout) as e:
            logging.warning(f"Request attempt {attempt + 1} failed: {e}")
            time.sleep(1)
    logging.error(f"Failed to fetch data after {RETRY_LIMIT} attempts. Skipping...")
    return None


def run_command_with_timeout(command, timeout):
    try:
        result = subprocess.run(command, shell=True, capture_output=True, text=True, timeout=timeout)
        return result
    except subprocess.TimeoutExpired:
        logging.error(f"Command timed out after {timeout} seconds.")
        return None


# =====================
# DATA EXTRACTION
# =====================
def extract_car_data(car, postcode_pattern):
    try:
        car_id = car.get("id")
        if not car_id:
            return None

        data_mileage = float(car.get("data-mileage") or -1)
        listing_price = float(car.get("data-price") or -1)
        raw_postcode = car.get("data-listing-zip-code")
        postcode = raw_postcode[0:4] + raw_postcode[-2:].upper() if raw_postcode else None
        if postcode and not is_valid_format(postcode, postcode_pattern):
            postcode = None

        transmission_text = car.find("span", {"data-testid": "VehicleDetails-transmission"})
        fuel_text = car.find("span", {"data-testid": "VehicleDetails-gas_pump"})
        power_text = car.find("span", {"data-testid": "VehicleDetails-speedometer"})
        power_text = power_text.get_text(strip=True) if power_text else None

        kw_value, pk_value = None, None
        if power_text:
            match = re.search(r"(\d+)\s*kW.*\((\d+)\s*PK\)", power_text)
            if match:
                kw_value = float(match.group(1))
                pk_value = float(match.group(2))

        return {
            "car_id": car_id,
            "make": car.get("data-make"),
            "model": car.get("data-model"),
            "first_registration": car.get("data-first-registration"),
            "fuel_type": car.get("data-fuel-type"),
            "mileage": data_mileage,
            "post_code_raw": raw_postcode,
            "post_code": postcode,
            "listing_price": listing_price,
            "transmission": transmission_text.get_text(strip=True) if transmission_text else None,
            "fuel_text": fuel_text.get_text(strip=True) if fuel_text else None,
            "power_text": power_text,
            "power_kw": kw_value,
            "power_pk": pk_value
        }
    except Exception as e:
        logging.error(f"Error extracting car data: {e}")
        return None


def fetch_page(params):
    """Fetch and parse one page from AutoScout."""
    if stop_requested:
        return []

    html = safe_request(BASE_URL, params=params)
    if not html:
        return []

    soup = BeautifulSoup(html, "html.parser")
    car_listings = soup.find_all("article", class_="cldt-summary-full-item")

    results = []
    for car in car_listings:
        car_info = extract_car_data(car, POSTCODE_PATTERN)
        if car_info:
            results.append(car_info)
    return results


# =====================
# MAIN SCRIPT
# =====================
def main():
    global stop_requested
    # Initialize logging
    timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    logging.basicConfig(filename=f"../logging/script_log_{timestamp}.log", level=logging.INFO,
                        format='%(asctime)s - %(levelname)s - %(message)s')
    logging.info('Script started with concurrency, rate limiting, and proxy rotation.')

    start_time = time.time()

    # Enable VPN
    # logging.info("Connecting to VPN...")
    # vpn_result = run_command_with_timeout(VPN_CONNECT_COMMAND, VPN_TIMEOUT)
    # if vpn_result and vpn_result.returncode == 0:
    #     logging.info("VPN connection established successfully.")
    # else:
    #     logging.error("Failed to establish VPN connection.")
    #     sys.exit(1)

    # Load environment variables
    load_dotenv()
    supabase_url = os.getenv("SUPABASE_URL")
    supabase_key = os.getenv("SUPABASE_KEY")
    if not supabase_url or not supabase_key:
        logging.error("Supabase credentials missing in .env file.")
        sys.exit(1)

    supabase: Client = create_client(supabase_url, supabase_key)
    table_name = "autoscout_car_adverts"

    # Fetch initial car IDs
    try:
        response = supabase.table(table_name).select("car_id").execute()
        car_ids_in_database = {d['car_id'] for d in response.data}
        logging.info(f"Found {len(car_ids_in_database)} existing car IDs.")
    except Exception as e:
        logging.error(f"Failed to fetch initial data from Supabase: {e}")
        sys.exit(1)

    # Define price and mileage ranges
    price_vec = np.array([0, 500, 1000])
    km_vec = np.array([0, 1000, 5000])

    cars_to_insert = []
    car_ids_in_upsert = set()
    batch_lock = Lock()
    count_added = 0

    # =====================
    # MAIN LOOPS WITH CONCURRENCY
    # =====================
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        future_to_params = {}

        for k, price in enumerate(price_vec[:-1]):
            if stop_requested or time.time() - start_time > MAX_RUNTIME_SECONDS:
                break

            for j, km in enumerate(km_vec[:-1]):
                if stop_requested or time.time() - start_time > MAX_RUNTIME_SECONDS:
                    break

                for i in range(PAGE_LIMIT_AUTOSCOUT):
                    params = {
                        "atype": "C",
                        "cy": "NL",
                        "damaged_listing": "exclude",
                        "desc": "1",
                        "powertype": "kw",
                        "sort": "age",
                        "source": "homepage_search-mask",
                        "ustate": "N,U",
                        "pricefrom": int(price_vec[k]),
                        "priceto": int(price_vec[k + 1]),
                        "kmfrom": int(km_vec[j]),
                        "kmto": int(km_vec[j + 1]),
                        "page": i + 1
                    }
                    future = executor.submit(fetch_page, params)
                    future_to_params[future] = params

        for future in tqdm(as_completed(future_to_params), total=len(future_to_params), desc="Processing Tasks" ):
            if stop_requested:
                break

            try:
                page_results = future.result()
                if not page_results:
                    continue

                with batch_lock:
                    for car in page_results:
                        car_id = car["car_id"]
                        if car_id not in car_ids_in_upsert:
                            cars_to_insert.append(car)
                            car_ids_in_database.add(car_id)
                            car_ids_in_upsert.add(car_id)

                    if len(cars_to_insert) >= BATCH_SIZE:
                        supabase.table(table_name).upsert(cars_to_insert, ignore_duplicates=True).execute()
                        count_added += len(cars_to_insert)
                        logging.info(f"Inserted {count_added} cars so far...")
                        cars_to_insert = []
                        car_ids_in_upsert = set()
            except Exception as e:
                logging.error(f"Error processing future: {e}")

    # Flush remaining cars
    if cars_to_insert:
        supabase.table(table_name).upsert(cars_to_insert).execute()
        count_added += len(cars_to_insert)

    logging.info(f"Script finished. Total cars added: {count_added}")

In [7]:
if __name__ == '__main__':
    main()

Processing Tasks: 100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 80/80 [00:02<00:00, 36.56it/s]


In [35]:
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
from tqdm import tqdm

def task(n):
    time.sleep(n)
    return n

tasks = [1, 2, 3, 4, 5]
completed_count = 0
future_to_params = {}

with ThreadPoolExecutor(max_workers=10) as executor:
    for t in tasks:
        for i in range(2):
            for j in range(2):
                future = executor.submit(task, t) 
                future_to_params[future] = t

    for future in tqdm(as_completed(future_to_params), total=len(future_to_params)):
        result = future.result()
        completed_count += 1
        # print(f"Task finished with result {result}, completed {completed_count}/{len(tasks)}")

100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 20/20 [00:07<00:00,  2.50it/s]
