In [None]:
import pandas as pd

# Load parquet file
df = pd.read_parquet("logos.snappy.parquet")

domains = df['domain'].unique().tolist()

In [None]:
with open("domains.txt", "w") as f:
    for domain in domains:
        f.write(domain + "\n")

## Scrape websites

In [None]:
import os

os.system('python scraper_crawl.py')

## Extract logos from the html files

In [2]:
import os

# Set the path to your folder
folder_path = 'scraped_domains_html'

for filename in os.listdir(folder_path):
    if filename.startswith('www.'):
        new_name = filename.replace('www.', '', 1)
        old_path = os.path.join(folder_path, filename)
        new_path = os.path.join(folder_path, new_name)
        os.rename(old_path, new_path)
        print(f'Renamed: {filename} -> {new_name}')


In [None]:
from logo_extractor import extract_logo_url_from_html

folder_path = "scraped_domains_html"
logos = extract_logo_url_from_html(folder_path)

In [None]:
## Remove prefix
for entry in logos:
    if entry['logo_url'].startswith("https://"):
        entry['logo_url'] = entry['logo_url'].replace("https://", "", 1)

    if entry['logo_url'].startswith("http://"):
        entry['logo_url'] = entry['logo_url'].replace("http://", "", 1)

In [None]:
import json

## Prepare logos image paths for download by removing "NO_LOGO_FOUND" from the entries
filtered_logos = [entry for entry in logos if entry['logo_url'] != 'NO_LOGO_FOUND']
# Save list to JSON
with open("logos_image_paths.json", "w") as f:
    json.dump(filtered_logos, f)

In [None]:
## Download Logos
import os

os.system('python flaresolverr_logo_download.py')

In [None]:
## Get the failed domains
downloaded_domains = [os.path.splitext(f)[0] for f in os.listdir('logos') if os.path.isfile(os.path.join('logos', f))]

with open('domains.txt', 'r') as file:
    all_domains = [line.strip() for line in file]

failed_domains = [domain for domain in all_domains if domain not in downloaded_domains]

In [32]:
len(failed_domains)

208

In [63]:
temp = [os.path.splitext(f)[0] for f in os.listdir('logos') if os.path.isfile(os.path.join('logos', f))]

In [64]:
len(temp)

1474

## Fill the missing logo urls

In [25]:
from dotenv import load_dotenv
import os
import requests
from io import BytesIO
from PIL import Image
import time

load_dotenv()
LOGODEV_API_KEY = os.getenv("LOGODEV_API_KEY")

In [26]:
def is_valid_image_url(url, content=None):
    try:
        if content is None:
            response = requests.get(url, timeout=5)
            if response.status_code != 200:
                return False
            content = response.content
        img = Image.open(BytesIO(content))
        return img.width > 1 and img.height > 1
    except Exception as e:
        print(f"⚠️ Image validation failed for {url}: {e}")
        return False

def get_logo_logodev(domain):
    url = f"https://img.logo.dev/{domain}?token={LOGODEV_API_KEY}&fallback=404"
    try:
        response = requests.get(url, timeout=10)
        if response.status_code == 200 and response.headers["Content-Type"].startswith("image"):
            return url, 200
        elif response.status_code == 202:
            print(f"🕒 Logo.dev is processing {domain} (202)")
            return None, 202
        elif response.status_code == 404:
            print(f"🚫 Logo.dev: No logo for {domain} (404)")
            return None, 404
        else:
            print(f"🟥 Logo.dev failed for {domain} (status {response.status_code})")
            return None, response.status_code
    except Exception as e:
        print(f"⚠️ Logo.dev request error for {domain}: {e}")
        return None, None

def get_logo_clearbit(domain):
    url = f"https://logo.clearbit.com/{domain}"
    try:
        response = requests.get(url, timeout=10)
        if response.status_code == 200 and response.headers["Content-Type"].startswith("image"):
            if is_valid_image_url(url, response.content):
                return url, 200
            else:
                print(f"🟥 Clearbit returned invalid image for {domain}")
                return None, 200
        else:
            print(f"🟥 Clearbit failed for {domain} (status {response.status_code})")
            return None, response.status_code
    except Exception as e:
        print(f"⚠️ Clearbit error for {domain}: {e}")
        return None, None

In [None]:
def fetch_logos_for_domains(domains, use_logodev=True, use_clearbit=True):
    results = []
    pending_logos = []

    # Initial lookup
    for domain in domains:
        print(f"🔍 Looking for logo for: {domain}")
        logo_url = None

        # Try Logo.dev
        if use_logodev:
            logo_url_dev, status = get_logo_logodev(domain)
            if logo_url_dev:
                logo_url = logo_url_dev
            elif status == 202:
                print(f"⏳ Logo.dev is processing {domain}")
                pending_logos.append(domain)

        # Try Clearbit if no logo yet
        if not logo_url and use_clearbit:
            logo_url_clearbit, status = get_logo_clearbit(domain)
            if logo_url_clearbit:
                logo_url = logo_url_clearbit

        results.append({
            "domain": domain,
            "logo_url": logo_url  # might still be None here
        })

    # Retry pending domains from Logo.dev
    if pending_logos:
        print(f"\n🔁 Waiting to recheck {len(pending_logos)} pending logos from Logo.dev...")
        time.sleep(240)  # wait 4 minutes
        for domain in pending_logos:
            logo_url, status = get_logo_logodev(domain)
            if logo_url:
                print(f"✅ Logo now available for {domain}")
                # Update the entry in results
                for entry in results:
                    if entry["domain"] == domain and not entry["logo_url"]:
                        entry["logo_url"] = logo_url

    # Final fallback: set "LOGO_NOT_FOUND" if nothing was found
    for entry in results:
        if not entry["logo_url"]:
            entry["logo_url"] = 'NO_LOGO_FOUND'

    return results


In [35]:
failed_domains = fetch_logos_for_domains(failed_domains)

🔍 Looking for logo for: xella-colloquium.berlin
🔍 Looking for logo for: secureparkinghi.com
🔍 Looking for logo for: rheine.schule
🔍 Looking for logo for: ford.pl.ua
🔍 Looking for logo for: adecco.cl
🔍 Looking for logo for: pcllawyersfrankston.com.au
🔍 Looking for logo for: wurth.com.uy
🔍 Looking for logo for: renaultchiva.com
🔍 Looking for logo for: medef-artois.fr
🔍 Looking for logo for: medef-bearnetsoule.com
🔍 Looking for logo for: chicco.pl
🚫 Logo.dev: No logo for chicco.pl (404)
🟥 Clearbit failed for chicco.pl (status 404)
🔍 Looking for logo for: rotaryclubofessex.com
🔍 Looking for logo for: toofacedcosmetics.jp
🔍 Looking for logo for: rubiomonocoat.com.au
🔍 Looking for logo for: spitexaadorf.ch
🔍 Looking for logo for: metlife.co.kr
🔍 Looking for logo for: toyotafocsani.ro
🔍 Looking for logo for: secureparking.co.id
🔍 Looking for logo for: mazda-angola-autozuid.com
🔍 Looking for logo for: nikonschool.de
🔍 Looking for logo for: toyota-annecy.fr
🔍 Looking for logo for: toyota-rillie

In [40]:
failed_domains

[{'domain': 'xella-colloquium.berlin',
  'logo_url': 'https://img.logo.dev/xella-colloquium.berlin?token=pk_YHX64SgZS5Ws1uLYAkiEJQ&fallback=404'},
 {'domain': 'secureparkinghi.com',
  'logo_url': 'https://img.logo.dev/secureparkinghi.com?token=pk_YHX64SgZS5Ws1uLYAkiEJQ&fallback=404'},
 {'domain': 'rheine.schule',
  'logo_url': 'https://img.logo.dev/rheine.schule?token=pk_YHX64SgZS5Ws1uLYAkiEJQ&fallback=404'},
 {'domain': 'ford.pl.ua',
  'logo_url': 'https://img.logo.dev/ford.pl.ua?token=pk_YHX64SgZS5Ws1uLYAkiEJQ&fallback=404'},
 {'domain': 'adecco.cl',
  'logo_url': 'https://img.logo.dev/adecco.cl?token=pk_YHX64SgZS5Ws1uLYAkiEJQ&fallback=404'},
 {'domain': 'pcllawyersfrankston.com.au',
  'logo_url': 'https://img.logo.dev/pcllawyersfrankston.com.au?token=pk_YHX64SgZS5Ws1uLYAkiEJQ&fallback=404'},
 {'domain': 'wurth.com.uy',
  'logo_url': 'https://img.logo.dev/wurth.com.uy?token=pk_YHX64SgZS5Ws1uLYAkiEJQ&fallback=404'},
 {'domain': 'renaultchiva.com',
  'logo_url': 'https://img.logo.dev/r

In [None]:
import os
import requests

def download_logos(logo_data, save_dir='logos'):

    for item in logo_data:
        domain = item['domain']
        logo_url = item['logo_url']

        if logo_url == 'NO_LOGO_FOUND':
            print(f"❌ Skipping {domain} — no logo found.")
            continue

        try:
            response = requests.get(logo_url, timeout=10)
            if response.status_code == 200:
                file_path = os.path.join(save_dir, f"{domain}.png")
                with open(file_path, 'wb') as f:
                    f.write(response.content)
                print(f"✅ Saved logo for {domain}")
            else:
                print(f"⚠️ Failed to download logo for {domain} (status {response.status_code})")
        except Exception as e:
            print(f"⚠️ Error downloading logo for {domain}: {e}")


In [37]:
download_logos(failed_domains)

✅ Saved logo for xella-colloquium.berlin
✅ Saved logo for secureparkinghi.com
✅ Saved logo for rheine.schule
✅ Saved logo for ford.pl.ua
✅ Saved logo for adecco.cl
✅ Saved logo for pcllawyersfrankston.com.au
✅ Saved logo for wurth.com.uy
✅ Saved logo for renaultchiva.com
✅ Saved logo for medef-artois.fr
✅ Saved logo for medef-bearnetsoule.com
❌ Skipping chicco.pl — no logo found.
✅ Saved logo for rotaryclubofessex.com
✅ Saved logo for toofacedcosmetics.jp
✅ Saved logo for rubiomonocoat.com.au
✅ Saved logo for spitexaadorf.ch
✅ Saved logo for metlife.co.kr
✅ Saved logo for toyotafocsani.ro
✅ Saved logo for secureparking.co.id
✅ Saved logo for mazda-angola-autozuid.com
✅ Saved logo for nikonschool.de
✅ Saved logo for toyota-annecy.fr
✅ Saved logo for toyota-rillieux.fr
❌ Skipping chicco.ru — no logo found.
✅ Saved logo for subaru-bd.com
✅ Saved logo for toyotaortakoy.com.tr
✅ Saved logo for mazda-uae.com
✅ Saved logo for propertyfinder.com
❌ Skipping pingusenglishbd.com — no logo found.


In [38]:
totally_failed_domains = [entry for entry in failed_domains if entry['logo_url'] == 'NO_LOGO_FOUND']

In [39]:
totally_failed_domains

[]

## Convert all image formats to something PIL can handle

In [24]:
import os
import imghdr
import io
import cairosvg
from PIL import Image

def is_avif(path):
    try:
        with open(path, "rb") as f:
            return b"ftypavif" in f.read(32)
    except:
        return False

def load_image(path):
    ext = os.path.splitext(path)[1].lower()

    try:
        if ext == '.svg':
            png_data = cairosvg.svg2png(url=path)
            return Image.open(io.BytesIO(png_data)).convert("RGB")

        elif ext in ['.jpg', '.jpeg', '.png', '.webp', '.avif']:
            return Image.open(path).convert("RGB")

        elif ext == '.img':
            if is_avif(path):  # use your header check
                try:
                    return Image.open(path).convert("RGB")
                except Exception as e:
                    print(f"⚠️ Skipping unreadable AVIF: {path} — {e}")
                    return None
            else:
                real_ext = imghdr.what(path)
                if real_ext in ['jpeg', 'png', 'webp']:
                    return Image.open(path).convert("RGB")
                else:
                    print(f"⚠️ Skipping unknown .img: {path} (detected: {real_ext})")
                    return None

        else:
            print(f"⚠️ Skipping unsupported extension: {path}")
            return None

    except Exception as e:
        print(f"❌ Skipping unreadable file: {path} — Reason: {e}")
        return None


## Logo Similarity

In [25]:
import torch
import hnswlib
import numpy as np
import networkx as nx
import networkx.algorithms.components.connected as nx_conn
from tqdm import tqdm
from transformers import AutoProcessor, AutoModel
import pillow_avif
from PIL import Image
import community as community_louvain

In [26]:
from PIL import Image, ImageOps

def pad_to_square(image, fill=(255, 255, 255)):
    w, h = image.size
    max_dim = max(w, h)
    delta_w = max_dim - w
    delta_h = max_dim - h
    padding = (delta_w // 2, delta_h // 2, delta_w - delta_w // 2, delta_h - delta_h // 2)
    return ImageOps.expand(image, padding, fill=fill)

In [27]:
from transformers import AutoProcessor, AutoModel
import torch
import numpy as np
from tqdm import tqdm

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
processor = AutoProcessor.from_pretrained("facebook/dinov2-base")
model = AutoModel.from_pretrained("facebook/dinov2-base").eval().to(device)

# Lock processor to 224x224 resolution, disable cropping
processor.size = {"height": 224, "width": 224}
processor.do_center_crop = False

def extract_features_with_padding(image_paths, domains, batch_size=32):
    all_embeddings = []
    valid_domains = []

    with torch.inference_mode():
        for i in tqdm(range(0, len(image_paths), batch_size), desc="Extracting features"):
            batch_imgs = []
            batch_domains = []

            for p, domain in zip(image_paths[i:i+batch_size], domains[i:i+batch_size]):
                img = load_image(p)
                if img is None:
                    continue
                img = pad_to_square(img)
                batch_imgs.append(img)
                batch_domains.append(domain)

            if not batch_imgs:
                continue

            inputs = processor(images=batch_imgs, return_tensors="pt").to(device)
            outputs = model(**inputs).last_hidden_state.mean(dim=1)
            all_embeddings.append(outputs.cpu().numpy())
            valid_domains.extend(batch_domains)

    return np.vstack(all_embeddings), valid_domains

In [28]:
import hnswlib

def build_hnsw_index(embeddings, ef=100, ef_construction=200, M=64, save_path=None):
    dim = embeddings.shape[1]
    index = hnswlib.Index(space='l2', dim=dim)
    index.init_index(max_elements=len(embeddings), ef_construction=ef_construction, M=M)
    index.add_items(embeddings)
    index.set_ef(ef)

    if save_path:
        index.save_index(save_path)
    
    return index

In [29]:
import igraph as ig
import leidenalg

def build_similarity_graph(index, embeddings, k=10, threshold=0.75):
    labels, distances = index.knn_query(embeddings, k=k)
    G = nx.Graph()

    for i in range(len(embeddings)):
        for j, neighbor in enumerate(labels[i]):
            if i != neighbor:
                similarity = 1 - distances[i][j]
                if similarity >= threshold:
                    G.add_edge(i, neighbor, weight=similarity)

    return G

def graph_to_igraph(G_nx):
    """Convert NetworkX graph to igraph."""
    mapping = {node: idx for idx, node in enumerate(G_nx.nodes())}
    reverse_mapping = {idx: node for node, idx in mapping.items()}
    edges = [(mapping[u], mapping[v]) for u, v in G_nx.edges()]
    
    G_ig = ig.Graph(edges=edges, directed=False)
    return G_ig, reverse_mapping

def cluster_with_leiden(G_nx):
    G_ig, reverse_map = graph_to_igraph(G_nx)
    partition = leidenalg.find_partition(G_ig, leidenalg.ModularityVertexPartition)
    
    clusters = {}
    for cluster_id, nodes in enumerate(partition):
        clusters[cluster_id] = [reverse_map[n] for n in nodes]
    
    return clusters

In [30]:
def split_clusters(G):
    components = list(nx_conn.connected_components(G))
    final_clusters = []
    unique_logos = []

    for comp in components:
        if len(comp) > 1:
            final_clusters.append(list(comp))
        else:
            unique_logos.append(list(comp)[0])
    
    return final_clusters, unique_logos

In [31]:
def cluster_indices_to_domains(cluster_dict, domains):
    cluster_domains = {}
    for cluster_id, logo_indices in cluster_dict.items():
        cluster_domains[cluster_id] = [domains[i] for i in logo_indices]
    return cluster_domains

In [32]:
import os

def get_logo_paths_from_folder(folder):
    supported_exts = ['.jpg', '.jpeg', '.png', '.webp', '.svg', '.img']
    logo_paths = []
    domains = []

    for filename in os.listdir(folder):
        ext = os.path.splitext(filename)[1].lower()
        if ext in supported_exts:
            domain = os.path.splitext(filename)[0]  # remove extension
            path = os.path.join(folder, filename)
            logo_paths.append(path)
            domains.append(domain)

    return logo_paths, domains

In [33]:
import csv

def save_clusters_to_csv(cluster_dict, domains, output_file="clusters.csv"):
    with open(output_file, mode="w", newline="", encoding="utf-8") as f:
        writer = csv.writer(f)
        writer.writerow(["cluster_id", "domain"])
        for cluster_id, indices in cluster_dict.items():
            for idx in indices:
                writer.writerow([cluster_id, domains[idx]])
    print(f"✅ Clusters saved to {output_file}")

In [34]:
# Assuming logo_paths = list of file paths (PIL-compatible)
logo_paths, domains = get_logo_paths_from_folder("logos")

# Extract features with logo integrity preserved and standard 224x224
embeddings, valid_domains = extract_features_with_padding(logo_paths, domains, batch_size=32)

# 3. Build and save HNSW index
index = build_hnsw_index(embeddings, save_path="hnsw_index.bin")

# 4. Build graph and cluster
G = build_similarity_graph(index, embeddings, k=3, threshold=0.92)
clusters_dict = cluster_with_leiden(G)

# 5. Map cluster indices to domain names
domain_clusters = cluster_indices_to_domains(clusters_dict, valid_domains)
save_clusters_to_csv(clusters_dict, valid_domains, output_file="clusters.csv")

# 6. Print cluster summaries
for cluster_id, domain_list in domain_clusters.items():
    if len(domain_list) > 1:
        print(f"Cluster {cluster_id} ({len(domain_list)} logos): {set(domain_list)}")

Extracting features:   0%|          | 0/107 [00:00<?, ?it/s]

Extracting features:  21%|██        | 22/107 [00:07<00:28,  2.98it/s]

❌ Skipping unreadable file: logos\buydaikin.ro.png — Reason: cannot identify image file 'C:\\Users\\filip\\Desktop\\Veridion\\1. Logo Similarity\\logos\\buydaikin.ro.png'


Extracting features:  65%|██████▌   | 70/107 [00:25<00:10,  3.57it/s]

❌ Skipping unreadable file: logos\nepaltrust.gov.np.png — Reason: cannot identify image file 'C:\\Users\\filip\\Desktop\\Veridion\\1. Logo Similarity\\logos\\nepaltrust.gov.np.png'


Extracting features:  69%|██████▉   | 74/107 [00:26<00:09,  3.59it/s]

❌ Skipping unreadable file: logos\papajohns.co.nl.png — Reason: cannot identify image file 'C:\\Users\\filip\\Desktop\\Veridion\\1. Logo Similarity\\logos\\papajohns.co.nl.png'
❌ Skipping unreadable file: logos\papajohns.com.pk.png — Reason: cannot identify image file 'C:\\Users\\filip\\Desktop\\Veridion\\1. Logo Similarity\\logos\\papajohns.com.pk.png'


Extracting features:  70%|███████   | 75/107 [00:26<00:08,  3.74it/s]

❌ Skipping unreadable file: logos\papajohns.ma.png — Reason: cannot identify image file 'C:\\Users\\filip\\Desktop\\Veridion\\1. Logo Similarity\\logos\\papajohns.ma.png'
❌ Skipping unreadable file: logos\papajohnsquebec.ca.png — Reason: cannot identify image file 'C:\\Users\\filip\\Desktop\\Veridion\\1. Logo Similarity\\logos\\papajohnsquebec.ca.png'


Extracting features:  88%|████████▊ | 94/107 [00:37<00:07,  1.64it/s]

❌ Skipping unreadable file: logos\tbwaalif.com.svg — Reason: EntitiesForbidden(name='ns_extend', system_id=None, public_id=None)


Extracting features: 100%|██████████| 107/107 [00:43<00:00,  2.45it/s]


✅ Clusters saved to clusters.csv
Cluster 0 (225 logos): {'aamcosantaanaca.com', 'aamcoescondido.com', 'aamcomcallen.com', 'aamcocentraltampa.com', 'aamcomorganhill.com', 'aamcorochesterny.com', 'aamcotwinfalls.com', 'aamcoscottsdaleroad.com', 'aamcosandiego-miramar.com', 'aamcolancasterca.com', 'aamcoblog.com', 'aamcoofgreensboro.com', 'aamcocoronaca.com', 'aamcosuffolkva.com', 'aamcotulsa-brokenarrow.com', 'aamcotempe.com', 'aamcodelran.com', 'aamcosantarosa.com', 'aamco-newbraunfels.com', 'aamcobridgewater.com', 'aamcooaklandca.com', 'aamcoyumaaz.com', 'aamconsa.com', 'aamconorthridgeca.com', 'aamcowashingtondc.com', 'aamcoinglewoodca.com', 'aamcohouston-veteransmemorial.com', 'aamcoglendaleoldtown.com', 'aamco-bellevue.com', 'aamcocatonsvillemd.com', 'aamcoportsmouthnh.com', 'aamconorthwestfwy-houston.com', 'aamcowoodbridgeva.com', 'aamcooklahomacity-northwest.com', 'aamcorockvillemd.com', 'aamcohagerstownmd.com', 'aamcosantaclaritaca.com', 'aamcomcdonoughga.com', 'aamcoaloha.com', 

## Scraping Accuracy

In [17]:
import pandas as pd
total_domains = pd.read_parquet('logos.snappy.parquet')
total_domains = total_domains['domain'].unique().tolist()

print(f"\n✅ Logos processed: {len(valid_domains)} / {len(total_domains)} ({100 * len(valid_domains)/len(total_domains):.2f}%)")


✅ Logos processed: 3400 / 3416 (99.53%)
