In [None]:
# SimilarWeb API Data Extractor - Prototype V11
# ---------------------------------------------
# Author: Hicham Yezza
# Date: March 2025
# Descritpion:
# A script for extracting traffic data from the SimilarWeb API

# WARNINNG: Do not use for BBCM editorial purposes unless in conjunction with the Data Hub team - please do not share the API key with anyone

# Usage:
# 1- Enter your SimilarWeb API key when asked
# 2- Either upload a file with website URLs (CSV or xlsx) or type in a comma-separate list of websites (eg bbc.co.uk, Sana.sy )
# 3- Pick the sheet and column you need
# 4- Define start and end date for the data (please pay attention to the formatting guidance)
# 5- Choose the granularity need (i.e. how detailed the data should be -- daily, weekly, monthy)
# 6- Final data is saved as a CSV into your default folder
# 7- Open csv in Excel as usual
# use pivot tables to organise/navigate

In [None]:
"""
Key updates in V11:
- Added num selection for sheet/column
- Added granularity selection (daily, weekly, monthy)
- Improved error logging to address recurring API issues (401, 404, 429, 500)
- Fixed domain extraction (was causing a few odd issues before)
- Better rate limit handling - using adaptive backoff (for now seems to avoid previous issues)
- Using urlparse for handling URL format issues (still not 100% robust)
"""

In [None]:
# Dependencies (if not already installed)
!pip install pandas requests tqdm

In [None]:
# Imports
import os
import requests
import pandas as pd
import logging
import time
import random
from tqdm import tqdm
from urllib.parse import urlparse
from datetime import datetime
from google.colab import files
from urllib.parse import urlparse

In [None]:
# Logging (need to decide how to store/surface long term)
logging.basicConfig(filename="similarweb_log.txt", level=logging.INFO,
                    format="%(asctime)s - %(levelname)s - %(message)s")

In [None]:
# Get API key from user
API_KEY = input("Enter your SimilarWeb API key: ").strip()
if not API_KEY:
    raise ValueError("API key is required.")

In [None]:
# Upload file or enter URLs manually
choice = input("Do you want to upload a file with URLs? (yes/no): ").strip().lower()

if choice == 'yes':
    # Upload file containing website list
    print("Please upload your file.")
    uploaded = files.upload()
    file_path = list(uploaded.keys())[0]

    # Load websites from uploaded file
    if file_path.endswith('.xlsx'):
        xls = pd.ExcelFile(file_path)
        print("\nAvailable sheets:")
        for idx, sheet in enumerate(xls.sheet_names):
            print(f"{idx+1}: {sheet}")

        while True:
            try:
                sheet_idx = int(input("Pick sheet number: ").strip()) - 1
                if 0 <= sheet_idx < len(xls.sheet_names):
                    sheet_name = xls.sheet_names[sheet_idx]
                    break
                print("Invalid choice. Try again.")
            except ValueError:
                print("Enter a number.")

    df_websites = pd.read_excel(xls, sheet_name=sheet_name)

    # Pick column containing URLs
    print("\nColumns available:")
    for idx, col in enumerate(df_websites.columns):
        print(f"{idx+1}: {col}")

    while True:
        try:
            column_idx = int(input("Enter column number for URLs: ").strip()) - 1
            if 0 <= column_idx < len(df_websites.columns):
                column_name = df_websites.columns[column_idx]
                break
            print("Invalid selection, try again.")
        except ValueError:
            print("Enter a number.")

    websites = df_websites[column_name].dropna().astype(str).tolist()

else:
    # Direct input by user as comma-separated values
    websites_input = input("Enter website URLs separated by commas: ").strip()
    websites = [w.strip() for w in websites_input.split(",") if w.strip()]

In [None]:
# Extract domain names (handles missing http/https, removes www.)
def extract_domain(url):
    """Ensure URL has a scheme (http/https), then extract domain."""
    try:
        url = url.strip()  # Remove leading/trailing spaces

        if not url:
            return None  # Ignore empty values

        if not url.startswith(("http://", "https://")):
            url = "http://" + url  # Default to http if missing

        parsed_url = urlparse(url)
        domain = parsed_url.netloc.strip()

        if not domain:
            return None  # Ensure non-empty domain

        return domain.replace("www.", "")
    except Exception as e:
        print(f"Error processing URL {url}: {e}")
        return None

# Process user-provided URLs
websites = list(set(filter(None, [extract_domain(w) for w in websites])))

if not websites:
    print("No valid domains provided. Exiting.")
    exit()

print("Filtered domains:", websites)

In [None]:
# Date and granularity selection
def validate_date(date_str, date_format):
    """Checks if date is in correct format"""
    try:
        datetime.strptime(date_str, date_format)
        return True
    except ValueError:
        return False

valid_granularities = ['daily', 'weekly', 'monthly']
while True:
    GRANULARITY = input("Select granularity (daily/weekly/monthly): ").strip().lower()
    if GRANULARITY in valid_granularities:
        break
    print("Invalid choice, please enter daily, weekly, or monthly.")

DATE_FORMAT = "%Y-%m-%d" if GRANULARITY in ['daily', 'weekly'] else "%Y-%m"

while True:
    START_DATE = input(f"Start date ({DATE_FORMAT}): ").strip()
    END_DATE = input(f"End date ({DATE_FORMAT}): ").strip()

    if validate_date(START_DATE, DATE_FORMAT) and validate_date(END_DATE, DATE_FORMAT):
        break
    print(f"Invalid date format. Use {DATE_FORMAT}.")

In [None]:
# API request function ### with rate limit handling but need to revisit #AP
def fetch_api_data(website, api_key, retries=5, base_wait=5):
    headers = {
        'User-Agent': 'Mozilla/5.0 (DataExtractor/1.0)',
        'Accept': 'application/json',
    }

    url = (
        f"https://api.similarweb.com/v1/website/{website}/"
        "total-traffic-and-engagement/visits?"
        f"api_key={api_key}&start_date={START_DATE}&end_date={END_DATE}"
        f"&granularity={GRANULARITY}&main_domain_only=false&format=json"
        "&show_verified=false&mtd=false&engaged_only=false"
    )

    for attempt in range(retries):
        response = requests.get(url, headers=headers)

        if response.status_code == 200:
            logging.info(f"{website}: success")
            return response.json(), 200

        elif response.status_code == 404:
            logging.warning(f"{website}: not tracked (404)")
            return None, 404

        elif response.status_code == 401:
            logging.error(f"{website}: Unauthorized (401)")
            return None, 401

        elif response.status_code == 429:  # Too many
            retry_after = response.headers.get("Retry-After")
            if retry_after:
                wait_time = int(retry_after) + random.uniform(1, 3)  # Add slight randomness
            else:
                wait_time = base_wait * (2 ** attempt)  # Exponential backoff -- to revisit #AP

            logging.warning(f"{website}: Rate limited (429). Retrying in {wait_time:.2f} seconds...")
            time.sleep(wait_time)

        elif response.status_code == 500:  # Server
            wait_time = base_wait * (2 ** attempt)
            logging.warning(f"{website}: Server issue (500). Retrying in {wait_time:.2f} seconds...")
            time.sleep(wait_time)

        else:
            logging.error(f"{website}: Unexpected error ({response.status_code}): {response.text}")
            return None, response.status_code

    logging.error(f"{website}: Failed after {retries} retries")
    return None, None

In [None]:
# Fetch data
all_data = []
for site in tqdm(websites, desc="Fetching Data"):
    data, status_code = fetch_api_data(site, API_KEY)
    if data is not None:
        all_data.append({'website': site, 'data': data})

In [None]:
# Save results
if all_data:
    df_expanded = pd.DataFrame([
        {'website': row['website'], 'date': visit.get('date', 'N/A'), 'visits': visit.get('visits', 0)}
        for row in all_data if isinstance(row['data'], dict)
        for visit in row['data'].get('visits', [])
    ])

    if not df_expanded.empty:
        export_filename = f"Similarweb-Export-{START_DATE}-{END_DATE}-{GRANULARITY}.csv"
        df_expanded.round({'visits': 0}).to_csv(export_filename, index=False) # rounding off from flating point
        print(f"\nData saved as {export_filename}")
        files.download(export_filename)
