In [1]:
import kagglehub
import os
import shutil
import matplotlib.pyplot as plt
from typing import List
import tqdm
import yaml
import random, numpy as np
random.seed(0)
np.random.seed(0)

import cv2
cv2.setRNGSeed(0)   

MIN_MATCH_COUNT = 10
TBANK_PATH = "/kaggle/input/tbank-logo/tbank_logo.png"
SUBSET_IMAGE_SIZE = 29000
TRAIN_COEFFICIENT = 0.90

In [2]:
path = "/kaggle/input/sirius-cv-case"



In [3]:
os.makedirs('Dataset', exist_ok=True)
for file_name in os.listdir(path + "/data_sirius"):
  shutil.copy(path + "/data_sirius/" + file_name, f"./Dataset/{file_name}")


In [4]:
def detect_image_on_another(query_image: np.ndarray, search_image: np.ndarray) -> List[List[float]]:
  # --- Load inputs (grayscale) ---------------------------------------------------
  # We read both the logo ("query") and the scene ("search") as GRAYSCALE because
  # SIFT operates on intensity gradients; color is not required for SIFT to work.
  # Grayscale also saves memory and time without hurting descriptor quality.
  if query_image is None or search_image is None:
      return None

  # --- Detect local features (SIFT) ---------------------------------------------
  # Create a SIFT extractor. For each image, SIFT returns:
  #   - keypoints (kp_*): points with location (x,y), scale, and orientation;
  #   - descriptors (desc_*): a 128-D vector per keypoint encoding local gradient
  #     patterns. These descriptors are approximately invariant to in-plane
  #     rotation and uniform scale changes around each keypoint.
  # We will match descriptors between the query and search images to hypothesize
  # corresponding points that likely belong to the same physical logo parts.
  sift = cv2.SIFT_create(
      nfeatures=5000,              
      contrastThreshold=0.02,      
      edgeThreshold=20,            
      sigma=1.2                     
  )

  kp_q, desc_q = sift.detectAndCompute(query_image, None)
  kp_s, desc_s = sift.detectAndCompute(search_image, None)
  
  def rootsift(desc):
    if desc is None: return None
    eps = 1e-7
    desc = desc.astype(np.float32)
    desc /= (desc.sum(axis=1, keepdims=True) + eps)
    return np.sqrt(desc)

  desc_q_rs = rootsift(desc_q)
  desc_s_rs = rootsift(desc_s)


  if desc_q_rs is None or desc_s_rs is None or len(desc_q) < 2 or len(desc_s) < 2:
    return None
  # --- Build a descriptor matcher (FLANN + KD-tree) ------------------------------
  # We now search, for each query descriptor, its nearest neighbors in the search
  # image. FLANN (Fast Library for Approximate Nearest Neighbors) provides an
  # efficient KD-tree index for floating-point descriptors like SIFT.
  # 'trees=5' builds an ensemble of KD-trees (more trees -> better recall).
  # 'checks=50' is the number of leaf checks at query time (more -> better accuracy).
  # We ask for the 2 nearest neighbors (k=2) so we can apply the Lowe ratio test.
  FLANN_INDEX_KDTREE = 1
  index_params = dict(algorithm=FLANN_INDEX_KDTREE, trees=5)
  search_params = dict(checks=50)
  
  bf = cv2.BFMatcher(cv2.NORM_L2, crossCheck=False)
  matches = bf.knnMatch(desc_q_rs, desc_s_rs, k=2)


  # flann = cv2.FlannBasedMatcher(index_params, search_params)
  # try:
  #     matches = flann.knnMatch(desc_q, desc_s, k=2)
  # except cv2.error:
  #     return None
  # --- Lowe ratio test: keep distinctive matches ---------------------------------
  # Each 'matches[i]' contains the best (m) and second-best (n) candidate in the
  # search image for the i-th query descriptor. If m is only slightly better than n,
  # the match is ambiguous and we discard it. The classic threshold is 0.7–0.8; here
  # 0.7 is stricter (fewer, cleaner matches), which helps downstream geometry.
  good_matches = []
  for pair in matches:
    if len(pair) < 2:
      continue
    m, n = pair
    if m.distance < 0.8 * n.distance:
      good_matches.append(m)
  # --- Geometric verification + localization (Homography via RANSAC) -------------
  # Descriptor matches alone are not enough: some are outliers. We robustly fit a
  # single global planar transform (a homography) that many matches agree on.
  # If enough good matches exist (MIN_MATCH_COUNT), we estimate H using RANSAC:
  #   - Input: 2D correspondences between query (src_pts) and search (dst_pts).
  #   - Output: 3x3 homography matrix M and an inlier mask (which matches agree).
  # With M in hand, we can project the rectangle of the logo into the search image,
  # obtaining the quadrilateral that localizes the logo under perspective.
  if len(good_matches) >= MIN_MATCH_COUNT:
    # Build Nx1x2 arrays of matched 2D points in query (src) and search (dst).
    src_pts = np.float32([kp_q[m.queryIdx].pt for m in good_matches]).reshape(-1, 1, 2)
    dst_pts = np.float32([kp_s[m.trainIdx].pt for m in good_matches]).reshape(-1, 1, 2)

    # Estimate a planar projective transform with RANSAC.
    # The reprojection threshold is 5.0 pixels (tune with image resolution).
    M, mask = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, 7.0, maxIters=5000, confidence=0.999)
    # 'mask' flags which matches are inliers (consistent with M).

    if M is None or mask is None or mask.sum() < 4:
      matchesMask = None
      return None
    else:
      matchesMask = mask.ravel().tolist()

      # Map the four corners of the query image through the homography to the search.
      # This yields the logo's outline as a quadrilateral in the search image.
      h, w = query_image.shape
      pts = np.float32([ [0,0], [0, h - 1], [w - 1, h - 1], [w - 1, 0]]).reshape(-1, 1, 2)
      dst = cv2.perspectiveTransform(pts, M)
      points_for_area = np.asarray(dst).reshape(-1, 2).astype(np.float32)
      area_region_result = abs(cv2.contourArea(points_for_area))
      
      bboxes = dst.reshape(-1, 2).tolist()
      if bboxes:
        x_coords = [p[0] for p in bboxes]
        y_coords = [p[1] for p in bboxes]
        width = search_image.shape[1]
        height = search_image.shape[0]

        x_min = int(max(0, min(x_coords)))
        y_min = int(max(0, min(y_coords)))
        x_max = int(min(width, max(x_coords)))
        y_max = int(min(height, max(y_coords)))
        
        
        area_region_rectangle = (x_max - x_min) * (y_max - y_min)
        # print(area_region_result, area_region_rectangle)  
        # print(height * width)

        if area_region_rectangle == 0:
            return None
        
        if area_region_result / area_region_rectangle < 0.1 or area_region_rectangle < (height * width * 0.0005):
          return None
        
      return bboxes
  else:
    # If we don't have enough high-quality matches, homography estimation would
    # be unstable. Report how many we had versus the minimum required.
    return None

In [5]:
def draw_bbox(image, bbox, color=(0, 255, 0), thickness=2):
  labelled_image = cv2.polylines(image, [np.int32(bbox).reshape(-1, 1, 2)],True, 255, 3, cv2.LINE_AA)
  plt.imshow(labelled_image)
  plt.show()


In [6]:
def detect_logo(tbank_logo_path: str, image_path: str):
    tbank_logo = cv2.imread(tbank_logo_path, cv2.IMREAD_GRAYSCALE)
    image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
    if tbank_logo is None or image is None:
        return None
    bboxes_tbank = detect_image_on_another(tbank_logo, image)

    if bboxes_tbank:
        return bboxes_tbank
    else:
        return None

In [7]:
def quad_to_yolo_xywh(quad_xy, img_shape):
    """
    quad_xy: iterable with 4 (x,y) corner points (any order).
    img_shape: image.shape (H, W) or (H, W, C).
    returns: (xc_norm, yc_norm, w_norm, h_norm)
    """
    h, w = img_shape[:2]
    quad = np.array(quad_xy, dtype=np.float32).reshape(-1, 2)

    # Clip to image bounds to avoid tiny out-of-frame artifacts from homography
    quad[:, 0] = np.clip(quad[:, 0], 0, w - 1)
    quad[:, 1] = np.clip(quad[:, 1], 0, h - 1)

    # Axis-aligned bounding rectangle (two equivalent ways):
    # Way A: min/max
    x_min, y_min = quad[:, 0].min(), quad[:, 1].min()
    x_max, y_max = quad[:, 0].max(), quad[:, 1].max()

    # Way B (equivalent): OpenCV boundingRect
    # x_min, y_min, bw, bh = cv2.boundingRect(quad.astype(np.int32))
    # x_max, y_max = x_min + bw, y_min + bh

    bw, bh = (x_max - x_min), (y_max - y_min)
    if bw <= 0 or bh <= 0:
        raise ValueError("Degenerate box after homography.")

    xc, yc = (x_min + x_max) / 2.0, (y_min + y_max) / 2.0
    # Normalize
    return (xc / w, yc / h, bw / w, bh / h)


In [8]:
def yolo_to_rect(xc_n, yc_n, w_n, h_n, img_shape):
    h, w = img_shape[:2]
    xc, yc, bw, bh = xc_n*w, yc_n*h, w_n*w, h_n*h
    x1 = int(round(xc - bw/2)); y1 = int(round(yc - bh/2))
    x2 = int(round(xc + bw/2)); y2 = int(round(yc + bh/2))
    return x1, y1, x2, y2

In [9]:
def visualize_quad_and_yolo(img_bgr, quad, xywh_n, title="check"):
    img = img_bgr.copy()
    # polygon
    poly = np.int32(np.array(quad).reshape(-1,1,2))
    cv2.polylines(img, [poly], True, (255, 0, 0), 2)  # blue polygon

    # YOLO rect from normalized values
    x1, y1, x2, y2 = yolo_to_rect(*xywh_n, img.shape)
    cv2.rectangle(img, (x1,y1), (x2,y2), (0, 255, 0), 2)  # green rect
    plt.figure(figsize=(6,6)); plt.title(title); plt.imshow(cv2.cvtColor(img, cv2.COLOR_BGR2RGB)); plt.axis("off"); plt.show()


## Rename all files

In [10]:
idx = 0
for file_name in os.listdir("./Dataset"):
  os.rename("./Dataset/" + file_name, "./Dataset/" + str(idx) + ".jpg")
  idx += 1

In [11]:
images_subset = os.listdir("./Dataset")
np.random.shuffle(images_subset)
images_subset = images_subset[:SUBSET_IMAGE_SIZE]

In [12]:
np.random.shuffle(images_subset)
train_images = images_subset[:int(len(images_subset) * TRAIN_COEFFICIENT)]
test_images = images_subset[int(len(images_subset) * TRAIN_COEFFICIENT):]

In [13]:
os.makedirs("./yolo_dataset", exist_ok=True)
os.makedirs("./yolo_dataset/images/train", exist_ok=True)
os.makedirs("./yolo_dataset/images/val", exist_ok=True)
os.makedirs("./yolo_dataset/labels/train", exist_ok=True)
os.makedirs("./yolo_dataset/labels/val", exist_ok=True)



In [14]:
for train_img in tqdm.tqdm(train_images):
  src = f"./Dataset/{train_img}"
  try:
    img = cv2.imread(src)
    bboxes = detect_logo(TBANK_PATH, src)
    if bboxes:
      xc, yc, w, h = quad_to_yolo_xywh(bboxes, img.shape)
      with open(f"./yolo_dataset/labels/train/{train_img[:-4]}.txt", "w") as f:
        f.write(f"0 {xc:.6f} {yc:.6f} {w:.6f} {h:.6f}\n")
  except Exception:
    print(f"Error occured")
  finally:
    shutil.copy(src, f"./yolo_dataset/images/train/{train_img}")


  1%|          | 208/26100 [02:19<2:34:32,  2.79it/s]

Error occured


 51%|█████     | 13239/26100 [2:09:41<1:14:13,  2.89it/s]

Error occured


 66%|██████▋   | 17296/26100 [2:51:31<2:49:54,  1.16s/it]

Error occured


100%|██████████| 26100/26100 [4:21:28<00:00,  1.66it/s]


In [15]:
for test_img in tqdm.tqdm(test_images):
  src = f"./Dataset/{test_img}"
  try:
    img = cv2.imread(src)
    bboxes = detect_logo(TBANK_PATH, src)
    if bboxes:
      xc, yc, w, h = quad_to_yolo_xywh(bboxes, img.shape)
      with open(f"./yolo_dataset/labels/val/{test_img[:-4]}.txt", "w") as f:
        f.write(f"0 {xc:.6f} {yc:.6f} {w:.6f} {h:.6f}\n")
  except Exception:
    print(f"Error occured")
  finally:
    shutil.copy(src, f"./yolo_dataset/images/val/{test_img}")


100%|██████████| 2900/2900 [29:09<00:00,  1.66it/s]


In [16]:
data = {
    'path': '.',
    'nc': 1,
    'train': 'images/train',
    'val': 'images/val',
    'names': {
        0: 'tbank_logo',
    }
}

In [17]:
with open('./yolo_dataset/data.yaml', 'w') as file:
  yaml.dump(data, file)

In [18]:
import os, zipfile, shutil

def zipdir(dir_path: str, zip_path: str):
    with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as z:
        for root, _, files in os.walk(dir_path):
            for f in files:
                full = os.path.join(root, f)
                rel = os.path.relpath(full, dir_path)
                z.write(full, arcname=os.path.join("yolo_dataset", rel))

zipdir("yolo_dataset", "yolo_dataset.zip")


shutil.rmtree("yolo_dataset")
shutil.rmtree("Dataset")
print("Packed yolo_dataset.zip and removed the original folder.")


Packed yolo_dataset.zip and removed the original folder.
