In [1]:
import requests
import os
import time
import pandas as pd
from tqdm import tqdm
from PIL import Image
import imagehash
import networkx as nx
import json

In [2]:
LOGO_DEV_KEY = "pk_c7EgiJSqQ--VHvDLqhIDfg"
BRANDFETCH_KEY = "1idD4QCfbidiekmeoHG"

# HEADERS = {
#     "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
#                   "AppleWebKit/537.36 (KHTML, like Gecko) "
#                   "Chrome/120.0 Safari/537.36",
#     "Accept": "image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8",
#     "Referer": "https://brandfetch.com/"
# }

In [3]:
def normalize_domain(domain):
    return (
        str(domain)
        .strip()
        .lower()
        .replace("http://", "")
        .replace("https://", "")
        .replace("www.", "")
    )


In [4]:
def fetch_logo_logo_dev(domain, save_dir="logos"):
    os.makedirs(save_dir, exist_ok=True)

    domain = normalize_domain(domain)
    url = f"https://img.logo.dev/{domain}?token={LOGO_DEV_KEY}&size=256"

    try:
        r = requests.get(url, timeout=10)
    except requests.RequestException:
        return None

    if r.status_code != 200:
        return None

    if not r.headers.get("Content-Type", "").startswith("image/"):
        return None

    ext = "svg" if "svg" in r.headers["Content-Type"] else "png"
    path = os.path.join(save_dir, domain.replace(".", "_") + f".{ext}")

    with open(path, "wb") as f:
        f.write(r.content)

    return path

In [5]:
def fetch_logo_brandfetch(domain, save_dir="logos"):
    os.makedirs(save_dir, exist_ok=True)

    domain = normalize_domain(domain)
    url = f"https://cdn.brandfetch.io/{domain}?c={BRANDFETCH_KEY}"

    try:
        r = requests.get(url, headers=HEADERS, timeout=10)
    except requests.RequestException:
        return None

    if r.status_code != 200:
        return None

    if not r.headers.get("Content-Type", "").startswith("image/"):
        return None

    ext = "svg" if "svg" in r.headers["Content-Type"] else "png"
    path = os.path.join(save_dir, domain.replace(".", "_") + f".{ext}")

    with open(path, "wb") as f:
        f.write(r.content)

    return path


In [6]:
def get_logo(domain):
    path = fetch_logo_logo_dev(domain)
    if path:
        return path, "logo.dev"

    path = fetch_logo_brandfetch(domain)
    if path:
        return path, "brandfetch"

    return None, "not_found"

In [7]:
df = pd.read_parquet("logos.snappy.parquet")
domains = df.iloc[:, 0]

paths = []
sources = []

for domain in tqdm(domains, desc="Fetching company logos"):
    path, source = get_logo(domain)
    paths.append(path)
    sources.append(source)
    time.sleep(0.2)

df["logo_path"] = paths
df["logo_source"] = sources
df.to_parquet("data_with_logos.snappy.parquet", index=False)


Fetching company logos:   2%|█▏                                                   | 102/4384 [01:44<1:13:10,  1.03s/it]


KeyboardInterrupt: 

In [63]:
def domain_to_logo_path(domain, logo_dir="logos"):
    base = domain.replace(".", "_")
    for ext in ["png", "svg", "jpg", "jpeg"]:
        path = os.path.join(logo_dir, f"{base}.{ext}")
        if os.path.exists(path):
            return path
    return None


In [64]:
def load_and_hash_logo(path):
    """
    Load image, convert to grayscale, compute perceptual hash.
    Color is completely removed.
    """
    try:
        img = Image.open(path).convert("L")  # L = grayscale
        img = img.resize((256, 256))
        phash = imagehash.phash(img, hash_size=16)
        return phash
    except Exception:
        return None


In [67]:
df = pd.read_parquet("logos.snappy.parquet")

domains = df.iloc[:, 0].astype(str)

hashes = {} 
index_to_domain = {} 

for idx, domain in tqdm(domains.items(), total=len(domains), desc="Hashing logos"):
    logo_path = domain_to_logo_path(domain)

    if not logo_path:
        continue

    h = load_and_hash_logo(logo_path)
    if h is not None:
        hashes[idx] = h
        index_to_domain[idx] = domain


Hashing logos: 100%|███████████████████████████████████████████████████████████████| 4384/4384 [01:05<00:00, 66.70it/s]


In [68]:


G = nx.Graph()
indices = list(hashes.keys())

G.add_nodes_from(indices)

for i in tqdm(range(len(indices)), desc="Comparing logos"):
    for j in range(i + 1, len(indices)):
        h1 = hashes[indices[i]]
        h2 = hashes[indices[j]]

        if (h1 - h2) <= 10:
            G.add_edge(indices[i], indices[j])


Comparing logos: 100%|████████████████████████████████████████████████████████████| 4384/4384 [00:24<00:00, 180.19it/s]


In [69]:
groups = []

for component in nx.connected_components(G):
    group_domains = [index_to_domain[i] for i in component]
    groups.append(group_domains)


In [70]:


with open("logo_clusters.json", "w") as f:
    json.dump(groups, f, indent=2)
