In [2]:
import torch
from pathlib import Path
from torchvision import transforms as T
from PIL import Image, ImageOps
from tqdm.notebook import tqdm
import json
import shutil
from sqlalchemy import create_engine, text
from PIL import Image, ImageOps

device = torch.device("cuda:0")

## Automatically detect and crop

This part of the script run the detection model on all the images of a source directory, and crop the images to the detected bounding box. The cropped images are saved in a destination directory.

If no watermark is detected, the original image is copied to a special subdirectory "notfound".

In [3]:
DETECTION_MODEL = "../detection.pth"
DATA_DIR = Path("../data")
SOURCE_DIR = DATA_DIR / "xishen" / "A_classification"
TARGET_DIR = DATA_DIR / "detected" # caution: will be wiped out

In [4]:
m = torch.load(DETECTION_MODEL).to(device).eval()

In [None]:
if TARGET_DIR.exists():
    shutil.rmtree(TARGET_DIR)

TARGET_DIR.mkdir(parents=True, exist_ok=True)

dirs = [d for d in SOURCE_DIR.iterdir() if d.is_dir()]
nobox = []
allboxes = {}

for d in tqdm(dirs):
    if d.name == "notfound": continue

    ddir = TARGET_DIR / d.name
    ddir.mkdir(parents=True, exist_ok=True)
    files = [f for f in d.glob("**/*") if f.is_file() and f.suffix.lower() in [".jpg", ".jpeg", ".png"]]

    for f in tqdm(files):
        n = int(f.name.split(".")[0].split("-")[-1])

        img = Image.open(f)
        img0 = ImageOps.exif_transpose(img)
        img = T.ToTensor()(img)
        img = img.to(device)

        with torch.no_grad():
            out = m([img])[0]
            boxes = out["boxes"]
            scores = out["scores"]
        boxes = boxes[scores > 0.5]
        
        if len(boxes) == 0:
            print(f"no box found for {f}")
            nobox.append(f)
            continue
        
        crops = []
        for k, box in enumerate(boxes):
            x0, y0, x1, y1 = [float(f) for f in box]
            # rescale to original size
            sx, sy = img.shape[-1], img.shape[-2]
            x0, y0, x1, y1 = x0 / sx, y0 / sy, x1 / sx, y1 / sy
            oarea = (x1 - x0) * (y1 - y0)
            if oarea > 0.3 or (x1-x0) > 0.8 or (y1-y0) > 0.8:
                print(f"Box {k} too large {oarea:0.2f}", f)
                continue

            # compute intersections with previous crops
            ignore = False
            for crop in crops:
                x0_, y0_, x1_, y1_ = crop["box"]
                intersect = (max(x0, x0_), max(y0, y0_), min(x1, x1_), min(y1, y1_))
                if intersect[2] < intersect[0] or intersect[3] < intersect[1]:
                    continue
                area = (intersect[2] - intersect[0]) * (intersect[3] - intersect[1])
                if area / oarea > 0.5:
                    ignore = True
                    print(f"Ignoring box {k} overlapping box {crop['k']} by {area/oarea:0.2f}", f)
                    break
            
            if ignore: continue
            crops.append({"k": k, "box": (x0, y0, x1, y1)})
            
            sx, sy = img0.size
            x0, y0, x1, y1 = int(x0 * sx), int(y0 * sy), int(x1 * sx), int(y1 * sy)
            # convert to cx cy w h
            cx, cy, w, h = (x0 + x1) / 2, (y0 + y1) / 2, x1 - x0, y1 - y0
            # add 15% padding and square
            sz = max(w, h) * 1.2
            # crop
            x0, y0, x1, y1 = int(cx - sz / 2), int(cy - sz / 2), int(cx + sz / 2), int(cy + sz / 2)
            score = int(scores[k].item()*100)
            crop = img0.crop((x0, y0, x1, y1))
            crop.thumbnail((640, 640))
            tfile = ddir / f.parent.relative_to(d) / f"{f.name.split('.')[0]}+{k}({score:03d}).jpg"
            tfile.parent.mkdir(parents=True, exist_ok=True)
            crop.save(tfile, quality=85)

        allboxes[f"{f.parent.name}/{f.name}"] = crops

with open(TARGET_DIR / "boxes.json", "w") as target:
    json.dump(allboxes, target)

In [6]:
for f in nobox:
    # copy to notfound, so they can be processed manually
    ddir = TARGET_DIR / "notfound" / f.parent.name
    ddir.mkdir(parents=True, exist_ok=True)
    shutil.copyfile(f, ddir / f.name)

## Collect manually annotated watermarks

Use the annotator in this repository to annotate the watermarks in the folder "notfound", then run this part of the script to collect the result.

In [7]:
ANNOTATOR_DB_FILE = "../detection/annotator/db.sqlite3"
PREFIX = (TARGET_DIR / "notfound").relative_to(DATA_DIR)

engine = create_engine(f'sqlite:///{ANNOTATOR_DB_FILE}', echo=False)

with engine.connect() as connection:
    annotations = connection.execute(text(f'SELECT * FROM ANNOTATIONS WHERE image LIKE("/{PREFIX}/%")')).fetchall()

In [8]:
for annot in annotations:
    f = DATA_DIR / annot.image
    n = int(f.name.split(".")[0].split("-")[-1])

    img = Image.open(f)
    img0 = ImageOps.exif_transpose(img)
    boxes = [(annot.x0, annot.y0, annot.x1, annot.y1)]
    ddir = TARGET_DIR / f.parent.name

    for k, box in enumerate(boxes):
        x0, y0, x1, y1 = [float(f) for f in box]
        # rescale to original size
        crops.append({"k": k, "box": (x0, y0, x1, y1), "manual": True})
        sx, sy = img0.size
        x0, y0, x1, y1 = int(x0 * sx), int(y0 * sy), int(x1 * sx), int(y1 * sy)
        # convert to cx cy w h
        cx, cy, w, h = (x0 + x1) / 2, (y0 + y1) / 2, x1 - x0, y1 - y0
        # add 15% padding and square
        sz = max(w, h) * 1.2
        # crop
        x0, y0, x1, y1 = int(cx - sz / 2), int(cy - sz / 2), int(cx + sz / 2), int(cy + sz / 2)
        score = int(scores[k].item()*100)
        crop = img0.crop((x0, y0, x1, y1))
        crop.thumbnail((640, 640))

        crop.save(ddir / f"{f.name.split('.')[0]}+{k}(100).jpg", quality=85)

    allboxes[f"{f.parent.name}/{f.name}"] = crops

with open(TARGET_DIR / "added_boxes.json", "w") as target:
    json.dump(allboxes, target)