Req. packages - for online ipynb's


In [None]:
!pip install imagehash
!pip install cairosvg
!pip install filetype
!pip install requests-toolbelt
!pip install urllib3

Put all the domains from parquet to txt

In [1]:
import pandas as pd

def extract_domains_from_parquet(parquet_path, output_file='domains.txt'):
    df = pd.read_parquet(parquet_path)

    if 'domain' not in df.columns:
        raise ValueError("The Parquet file does not contain a 'domain' column.")

    domains = df['domain'].dropna().tolist()#extract domain values-drop NaN entries-convert to list

    with open(output_file, 'w') as f: #save 2 output file
        for domain in domains:
            f.write(f"{domain}\n")

    print(f"Successfully extracted {len(domains)} domains and saved to {output_file}.")

if __name__ == "__main__":
    extract_domains_from_parquet('logos.snappy.parquet', 'domains.txt')

Successfully extracted 4384 domains and saved to domains.txt.


Extracting Logos from the domains ~4384 domains found in the parquet file
FOUND - 2690 logos parsed

In [None]:
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin
from pathlib import Path

def get_logo_url(domain):
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
    }

    try:
        for scheme in ['https://', 'http://']: #searched for all the types of logos
            try:
                url = scheme + domain
                response = requests.get(url, headers=headers, timeout=10)
                soup = BeautifulSoup(response.content, 'html.parser')

                og_image = soup.find('meta', property='og:image')
                if og_image and og_image.get('content'):
                    return urljoin(url, og_image.get('content'))

                twitter_image = soup.find('meta', attrs={'name': 'twitter:image'})
                if twitter_image and twitter_image.get('content'):
                    return urljoin(url, twitter_image.get('content'))

                logo_selectors = [
                    ('link', {'rel': ['icon', 'shortcut icon', 'apple-touch-icon']}),
                    ('img', {'class': 'logo'}),
                    ('img', {'id': 'logo'}),
                    ('img', {'alt': 'logo'})
                ]

                for tag, attrs in logo_selectors:
                    element = soup.find(tag, attrs)
                    if element and element.get('href' if tag == 'link' else 'src'):
                        return urljoin(url, element.get('href' if tag == 'link' else 'src'))

            except (requests.exceptions.SSLError, requests.exceptions.ConnectionError):
                continue

    except Exception as e:
        print(f"Error processing {domain}: {str(e)}")
        return None

    return None

def download_logo(domain, output_dir="logos"): #searching for the logo files in the site's header
    logo_url = get_logo_url(domain)
    if not logo_url:
        return False

    try:
        response = requests.get(logo_url, stream=True, timeout=10)
        if response.status_code == 200:
            Path(output_dir).mkdir(parents=True, exist_ok=True)
            content_type = response.headers.get('content-type', '').split('/')[-1]
            ext = content_type if content_type in ['png', 'jpeg', 'jpg', 'svg+xml', 'gif'] else \
                logo_url.split('.')[-1].split('?')[0][:4]
            filename = f"{domain.replace('.', '_')}.{ext.replace('svg+xml', 'svg')}"
            filepath = os.path.join(output_dir, filename)

            with open(filepath, 'wb') as f:
                for chunk in response.iter_content(1024):
                    f.write(chunk)
            return True
    except Exception as e:
        print(f"Domain Failed")
    return False

def process_domains(input_file="domains.txt", output_dir="logos"): #download the logos
    with open(input_file, 'r') as f:
        domains = [line.strip() for line in f.readlines()]

    for domain in domains:
        print(f"Processing {domain}...")
        success = download_logo(domain, output_dir)
        if success:
            print(f"Downloaded logo for {domain}")
        else:
            print(f"Failed to download logo for {domain}")

if __name__ == "__main__":#create logos directory if not exists
    Path("logos").mkdir(exist_ok=True)
    process_domains()

Group the logos in a json based on their predominant, secondary and tertial color scheme using Union-Find clustering with threshold and color frequency analysis

In [12]:
import os
import json
from collections import defaultdict

def load_images_from_folder(folder):#load images, deliberately keep the colors
    images = {}
    valid_extensions = {'.jpg', '.jpeg', '.png', '.webp'}
    for filename in os.listdir(folder):
        ext = os.path.splitext(filename)[1].lower()
        if ext in valid_extensions:
            filepath = os.path.join(folder, filename)
            img = cv2.imread(filepath)
            if img is not None:
                images[filepath] = img
    return images

def compute_histogram(image, bins=8):#computing 3D color histogram in hsv space
    hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
    hist = cv2.calcHist(
        [hsv],
        [0, 1, 2],  #the hsv channels
        None,
        [bins, bins, bins],
        [0, 180, 0, 256, 0, 256]
    )
    cv2.normalize(hist, hist)
    return hist.flatten()

def compute_similarity(hist1, hist2):#histogram correlation similarity
    return cv2.compareHist(hist1, hist2, cv2.HISTCMP_CORREL)

def cluster_images(images, similarity_threshold=0.85):#Union-Find clustering with threshold
    paths = list(images.keys())
    n = len(paths)
    parent = list(range(n))

    def find(u):
        while parent[u] != u:
            parent[u] = parent[parent[u]]
            u = parent[u]
        return u

    def union(u, v):
        pu, pv = find(u), find(v)
        if pu != pv:
            parent[pv] = pu

    hists = [compute_histogram(img) for img in images.values()]
    for i in range(n):
        for j in range(i+1, n):
            if compute_similarity(hists[i], hists[j]) >= similarity_threshold:
                union(i, j)
    clusters = defaultdict(list)
    for idx in range(n):
        clusters[find(idx)].append(paths[idx])

    return list(clusters.values())

def get_average_color(images): #compute circular mean for hue, arithmetic mean for saturation/value !!!!there may be some better formulas, this is the best I tried so far
    h_angles, s_values, v_values = [], [], []

    for img in images:
        hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
        h, s, v = cv2.split(hsv)
        h_deg = h.astype(float) * 2 #convert hue to angles (0-360)
        sin = np.sum(np.sin(np.radians(h_deg)))
        cos = np.sum(np.cos(np.radians(h_deg)))
        avg_h = np.degrees(np.arctan2(sin, cos)) % 360

        h_angles.append(avg_h)
        s_values.append(np.mean(s))
        v_values.append(np.mean(v))

    sin_total = np.sum(np.sin(np.radians(h_angles)))#circular mean for hue
    cos_total = np.sum(np.cos(np.radians(h_angles)))
    avg_h = np.degrees(np.arctan2(sin_total, cos_total)) % 360

    return {
        "h": avg_h,
        "s": np.mean(s_values),
        "v": np.mean(v_values)
    }

import cv2
import numpy as np

def get_dominant_colors(images, n_colors=3, color_bits=5):#extract dominant colors using color frequency analysis


    hist = {}
    bin_size = 2 ** (8 - color_bits)#created a 3D histogram in reduced color space

    for img in images:
        resized = cv2.resize(img, (100, 100))
        quantized = (resized // bin_size) * bin_size + bin_size//2
        for color in quantized.reshape(-1, 3):#update histogram counts
            bgr = tuple(color)
            hist[bgr] = hist.get(bgr, 0) + 1

    sorted_colors = sorted(hist.items(), key=lambda x: -x[1])[:n_colors]#sort colors by frequency and get top N
    return [f"#{r:02x}{g:02x}{b:02x}" # Convert to hex format
            for (b, g, r), _ in sorted_colors]

def get_size_info(images): #check image sizes - fixed warnings and errors
    if not images:  #handle empty cluster edge case
        return "No images"

    sizes = set(img.shape[:2][::-1] for img in images)

    if len(sizes) == 1:
        size = sizes.pop()
        return f"{size[0]}x{size[1]}"
    return f"Varies ({len(sizes)} different sizes)"

def save_results(clusters, images, output_file="Color_Histogram_clusters2.json"): #save clusters with specifications in a column
    result = []

    for cluster in clusters:
        cluster_images = [images[path] for path in cluster]

        avg_color = get_average_color(cluster_images)
        hsv_pixel = np.uint8([[[avg_color['h']/2, avg_color['s'], avg_color['v']]]])
        bgr_pixel = cv2.cvtColor(hsv_pixel, cv2.COLOR_HSV2BGR)
        avg_hex = f"#{bgr_pixel[0,0,2]:02x}{bgr_pixel[0,0,1]:02x}{bgr_pixel[0,0,0]:02x}"

        specs = {
            "average_color": avg_hex,
            "dominant_colors": get_dominant_colors(cluster_images),
            "size_info": get_size_info(cluster_images)
        }

        result.append({
            "specs": specs,
            "files": [os.path.relpath(p) for p in cluster]
        })

    with open(output_file, 'w') as f:
        json.dump(result, f, indent=2)

if __name__ == "__main__":
    FOLDER_PATH = "/content/logos"
    SIMILARITY_THRESHOLD = 0.82

    print("Loading images...")
    images = load_images_from_folder(FOLDER_PATH)

    print("Clustering...")
    clusters = cluster_images(images, SIMILARITY_THRESHOLD)

    print("Generating specs...")
    save_results(clusters, images)

    print(f"\nSample cluster specs:")
    sample = json.load(open("Color_Histogram_clusters2.json"))[0]
    print(json.dumps(sample, indent=2))

Loading images...
Clustering...
Generating specs...

Sample cluster specs:
{
  "specs": {
    "average_color": "#918894",
    "dominant_colors": [
      "#34343c",
      "#fcfcfc",
      "#f4f4f4"
    ],
    "size_info": "292x292"
  },
  "files": [
    "logos/bakertilly_bg.png",
    "logos/bakertilly_com_cy.png"
  ]
}


JSON Viewer Columns List

In [14]:
import json
import textwrap
import math
import matplotlib.pyplot as plt
from matplotlib.patches import Rectangle

COLUMN_PADDING = 40 #data configuration
CLUSTER_SPACING = 20
COLOR_HEIGHT = 40
FONT_SIZE = 12
LINE_HEIGHT = FONT_SIZE * 1.8
TEXT_PADDING = 8
IMAGE_SIZE = (3840, 17060)  #4K resolution !! should be adapted the width based on the number of data samples
WRAP_WIDTH = 35
JSON_FILE_PATH = '/content/Color_Histogram_clusters2.json'
OUTPUT_FILE = 'Color_Histogram_clusters.png'
MIN_CLUSTER_WIDTH = 600
MAX_COLUMNS = math.floor((IMAGE_SIZE[0] + COLUMN_PADDING) / (MIN_CLUSTER_WIDTH + COLUMN_PADDING))

def load_and_process_data(filename):
    with open(filename, 'r', encoding='utf-8') as f:
        data = json.load(f)

    total_height = 0
    for cluster in data:
        wrapped_files = []
        for file in cluster['files']:
            wrapped_files.extend(textwrap.wrap(file, width=WRAP_WIDTH))
        cluster['wrapped_files'] = wrapped_files

        text_height = len(wrapped_files) * LINE_HEIGHT
        cluster_height = COLOR_HEIGHT + TEXT_PADDING + text_height + TEXT_PADDING
        cluster['height'] = cluster_height
        total_height += cluster_height + CLUSTER_SPACING

    max_column_height = IMAGE_SIZE[1] - 100
    height_based_columns = max(1, math.ceil(total_height / max_column_height))
    width_based_columns = MAX_COLUMNS
    num_columns = min(height_based_columns, width_based_columns)

    return data, num_columns

def arrange_clusters(data, num_columns):
    total_horizontal_space = IMAGE_SIZE[0] - (COLUMN_PADDING * (num_columns - 1))
    column_width = max(total_horizontal_space // num_columns, MIN_CLUSTER_WIDTH)

    columns = [[] for _ in range(num_columns)]
    current_heights = [0] * num_columns

    for cluster in data:
        suitable_columns = [i for i, h in enumerate(current_heights) if h + cluster['height'] <= IMAGE_SIZE[1]]

        if suitable_columns:
            col_index = min(suitable_columns, key=lambda i: current_heights[i])
        else:
            col_index = current_heights.index(min(current_heights))
            if num_columns < MAX_COLUMNS:
                num_columns += 1
                columns.append([])
                current_heights.append(0)
                col_index = num_columns - 1

        columns[col_index].append(cluster)
        current_heights[col_index] += cluster['height'] + CLUSTER_SPACING
    column_width = max((IMAGE_SIZE[0] - (COLUMN_PADDING * (num_columns - 1))) // num_columns, MIN_CLUSTER_WIDTH)
    x_positions = [i * (column_width + COLUMN_PADDING) for i in range(num_columns)]
    arranged_data = []
    for col_idx, clusters in enumerate(columns):
        x = x_positions[col_idx]
        y = 0
        for cluster in clusters:
            cluster['pos'] = (x, y)
            cluster['width'] = column_width
            y += cluster['height'] + CLUSTER_SPACING
            arranged_data.append(cluster)

    return arranged_data

def create_visualization(data, output_filename):
    plt.figure(figsize=(IMAGE_SIZE[0]/100, IMAGE_SIZE[1]/100), dpi=100)
    ax = plt.gca()
    ax.set_xlim(0, IMAGE_SIZE[0])
    ax.set_ylim(IMAGE_SIZE[1], 0)
    ax.axis('off')

    for cluster in data:
        x, y = cluster['pos']
        width = cluster['width']
        height = cluster['height']
        colors = cluster['specs']['dominant_colors']#drawing color bars
        color_width = width / len(colors)
        for i, color in enumerate(colors):
            ax.add_patch(Rectangle(
                (x + i*color_width, y), color_width, COLOR_HEIGHT,
                facecolor=color, edgecolor='none'
            ))

        ax.add_patch(Rectangle(#container
            (x, y), width, height,
            linewidth=0.5, edgecolor='#333333', facecolor='none'
        ))
        text_y = y + COLOR_HEIGHT + TEXT_PADDING
        for line in cluster['wrapped_files']:
            plt.text(
                x + TEXT_PADDING, text_y, line,
                fontsize=FONT_SIZE, ha='left', va='top',
                fontfamily='monospace', wrap=True,
                color='#333333', linespacing=1.2
            )
            text_y += LINE_HEIGHT

    plt.savefig(output_filename, bbox_inches='tight', pad_inches=0.1, dpi=100)
    plt.close()

if __name__ == "__main__":
    try:
        cluster_data, num_cols = load_and_process_data(JSON_FILE_PATH)
        arranged_data = arrange_clusters(cluster_data, num_cols)
        create_visualization(arranged_data, OUTPUT_FILE)
        print(f"Created visualization")
    except Exception as e:
        print(f"Error: {str(e)}")

Created visualization
