# Danbooru character image scraper (dev notebook)
This notebook downloads **.jpg** and **.png** images for each character tag into
`datasets/extended_dataset/train/<class_name>/`.

**Notes**
- This notebook **only scrapes** images; it **does not merge** with your original dataset.
- Danbooru read endpoints are globally rate-limited (10 req/s), and pagination/limits apply. See the API help page for details. 


## 0) Imports

In [33]:
import os
import re
import time
import json
import math
import shutil
import hashlib
from pathlib import Path
from dataclasses import dataclass
from typing import Dict, List, Optional, Iterable, Tuple
from collections import Counter

import aiohttp
import asyncio
import nest_asyncio
from PIL import Image


In [30]:
Image.MAX_IMAGE_PIXELS = 933120000

## 1) Configuration
Put your credentials in environment variables to avoid hardcoding them in the notebook:

- `DANBOORU_USERNAME`
- `DANBOORU_API_KEY`

Danbooru supports authentication either via `login` / `api_key` query params or HTTP Basic Auth. Below we use HTTP Basic Auth (cleaner).

In [None]:
# --- Danbooru endpoints ---
BASE_URL = "https://danbooru.donmai.us"   # main
# BASE_URL = "https://testbooru.donmai.us"  # safer for testing

# --- Credentials (recommended: env vars) ---
USERNAME = os.environ.get("DANBOORU_USERNAME", "").strip()
API_KEY  = os.environ.get("DANBOORU_API_KEY", "").strip()

if not USERNAME or not API_KEY:
    print("⚠️ Set DANBOORU_USERNAME and DANBOORU_API_KEY environment variables before scraping.")

AUTH = aiohttp.BasicAuth(USERNAME, API_KEY) if USERNAME and API_KEY else None

# --- Dataset paths ---
SPLITTED_TRAIN_DIR = Path("..") / "datasets" / "splitted" / "train"
OUT_TRAIN_DIR      = Path("..") / "datasets" / "extended_dataset" / "train"
OUT_TRAIN_DIR.mkdir(parents=True, exist_ok=True)

# --- Scrape policy ---
ALLOWED_EXTS = {"jpg", "jpeg", "png"}  # download jpg/png only
EXCLUDE_RATING = "e"                   # exclude explicit
SINGLE_CHAR_ONLY = True                # chartags:1

# --- Optional quality filters ---
# Minimum image dimensions. Set to 0 to disable a dimension constraint.
MIN_WIDTH = 512
MIN_HEIGHT = 512

# --- Request limits / throttling ---
POSTS_LIMIT_PER_PAGE = 200        # max for /posts.json
MAX_READ_RPS = 8                  # keep below Danbooru's 10 req/s read limit
MAX_CONN = 16                     # TCP connections
MAX_DL_CONCURRENCY = 12           # concurrent downloads (not requests per second)
REQUEST_TIMEOUT_S = 20
MAX_RETRIES = 4

# --- Dedup / overwrite behavior ---
SKIP_IF_EXISTS = True             # if file already exists in output dir, don't re-download

print("OUT_TRAIN_DIR:", OUT_TRAIN_DIR.resolve())


OUT_TRAIN_DIR: /home/lighter_01/projects/itmo/computer_vision/lab3_classif/datasets/extended_dataset/train


## 2) Classes and character tags
We read your class folder names from `datasets/splitted/train` and create matching output folders.

You said you'll **manually align** `character_tags_list` order with `characters_list` to keep the mapping simple.

In [11]:
# Read folder names (stable sort)
characters_list = sorted([p.name for p in SPLITTED_TRAIN_DIR.iterdir() if p.is_dir()])
print("Classes:", characters_list)
print("Num classes:", len(characters_list))

# Create output folders
for cls in characters_list:
    (OUT_TRAIN_DIR / cls).mkdir(parents=True, exist_ok=True)

Classes: ['Ace', 'Akainu', 'Brook', 'Chopper', 'Crocodile', 'Franky', 'Jinbei', 'Kurohige', 'Law', 'Luffy', 'Mihawk', 'Nami', 'Rayleigh', 'Robin', 'Sanji', 'Shanks', 'Usopp', 'Zoro']
Num classes: 18


In [18]:
# Put Danbooru character tags here IN THE SAME ORDER as characters_list.
character_tags_list = [
    'portgas_d._ace',
    'sakazuki_(one_piece)',
    'brook_(one_piece)',
    'tony_tony_chopper',
    'crocodile_(one_piece)',
    'franky_(one_piece)',
    'jinbe_(one_piece)',
    'marshall_d._teach',
    'trafalgar_law',
    'monkey_d._luffy',
    'dracule_mihawk',
    'nami_(one_piece)',
    'silvers_rayleigh',
    'nico_robin',
    'sanji_(one_piece)',
    'shanks_(one_piece)',
    'usopp',
    'roronoa_zoro'
]

assert len(character_tags_list) == len(characters_list), (
    f"Tags list length {len(character_tags_list)} must match classes length {len(characters_list)}"
)

characters_dict = dict(zip(characters_list, character_tags_list))
characters_dict


{'Ace': 'portgas_d._ace',
 'Akainu': 'sakazuki_(one_piece)',
 'Brook': 'brook_(one_piece)',
 'Chopper': 'tony_tony_chopper',
 'Crocodile': 'crocodile_(one_piece)',
 'Franky': 'franky_(one_piece)',
 'Jinbei': 'jinbe_(one_piece)',
 'Kurohige': 'marshall_d._teach',
 'Law': 'trafalgar_law',
 'Luffy': 'monkey_d._luffy',
 'Mihawk': 'dracule_mihawk',
 'Nami': 'nami_(one_piece)',
 'Rayleigh': 'silvers_rayleigh',
 'Robin': 'nico_robin',
 'Sanji': 'sanji_(one_piece)',
 'Shanks': 'shanks_(one_piece)',
 'Usopp': 'usopp',
 'Zoro': 'roronoa_zoro'}

## 3) Danbooru API helpers
We use:
- `GET /counts/posts.json` to get how many posts match tags.
- `GET /posts.json` to enumerate posts (max 200 per page).

We also implement retries, backoff, and handle throttling (HTTP 429).

In [19]:
@dataclass(frozen=True)
class ScrapeQuery:
    tag: str
    exclude_rating: str = EXCLUDE_RATING
    single_char_only: bool = SINGLE_CHAR_ONLY

    def tag_string(self) -> str:
        parts = [self.tag, f"-rating:{self.exclude_rating}"]
        if self.single_char_only:
            parts.append("chartags:1")
        return " ".join(parts)

def _sleep_backoff(attempt: int, base: float = 1.0, cap: float = 20.0) -> float:
    # exponential backoff with cap
    return min(cap, base * (2 ** attempt))

class RateLimiter:
    """Simple token-ish limiter: ensures we don't exceed MAX_READ_RPS."""
    def __init__(self, rps: float):
        self.min_interval = 1.0 / float(rps)
        self._last = 0.0
        self._lock = asyncio.Lock()

    async def wait(self):
        async with self._lock:
            now = time.perf_counter()
            wait = self.min_interval - (now - self._last)
            if wait > 0:
                await asyncio.sleep(wait)
            self._last = time.perf_counter()

In [20]:
rate_limiter = RateLimiter(MAX_READ_RPS)

In [21]:
async def danbooru_get_json(
    session: aiohttp.ClientSession,
    path: str,
    params: dict,
    *,
    timeout_s: int = REQUEST_TIMEOUT_S,
    max_retries: int = MAX_RETRIES,
) -> Optional[dict | list]:
    url = f"{BASE_URL}{path}"
    for attempt in range(max_retries):
        await rate_limiter.wait()
        try:
            async with session.get(url, params=params, timeout=timeout_s) as resp:
                # Handle throttling / transient errors
                if resp.status == 429:
                    retry_after = resp.headers.get("Retry-After")
                    delay = float(retry_after) if retry_after else _sleep_backoff(attempt, base=2.0)
                    print(f"429 throttled. Sleeping {delay:.1f}s for {url}")
                    await asyncio.sleep(delay)
                    continue
                if resp.status in (502, 503, 500):
                    delay = _sleep_backoff(attempt, base=2.0)
                    print(f"{resp.status} server error. Sleeping {delay:.1f}s for {url}")
                    await asyncio.sleep(delay)
                    continue
                if resp.status != 200:
                    text = await resp.text()
                    print(f"HTTP {resp.status} for {url}. Body (trim): {text[:200]}")
                    return None
                return await resp.json()
        except asyncio.TimeoutError:
            delay = _sleep_backoff(attempt, base=2.0)
            print(f"Timeout. Sleeping {delay:.1f}s for {url}")
            await asyncio.sleep(delay)
        except aiohttp.ClientError as e:
            delay = _sleep_backoff(attempt, base=2.0)
            print(f"ClientError {e}. Sleeping {delay:.1f}s for {url}")
            await asyncio.sleep(delay)
    return None


## 4) Count posts per character

In [22]:
async def get_post_count(session: aiohttp.ClientSession, query: ScrapeQuery) -> int:
    data = await danbooru_get_json(
        session,
        "/counts/posts.json",
        params={"tags": query.tag_string()},
    )
    if not data:
        return 0
    # response shape: { "counts": { "posts": N, ... } }
    return int(data.get("counts", {}).get("posts", 0))

async def count_all(characters: Dict[str, str]) -> Dict[str, int]:
    timeout = aiohttp.ClientTimeout(total=None)
    connector = aiohttp.TCPConnector(limit=MAX_CONN)
    async with aiohttp.ClientSession(auth=AUTH, timeout=timeout, connector=connector) as session:
        out = {}
        for cls, tag in characters.items():
            q = ScrapeQuery(tag=tag)
            n = await get_post_count(session, q)
            out[cls] = n
            print(f"{cls:>20}  tag={tag:<30}  posts={n}")
        return out


In [23]:
nest_asyncio.apply()
counts = asyncio.run(count_all(characters_dict))
counts

                 Ace  tag=portgas_d._ace                  posts=579
              Akainu  tag=sakazuki_(one_piece)            posts=51
               Brook  tag=brook_(one_piece)               posts=191
             Chopper  tag=tony_tony_chopper               posts=150
           Crocodile  tag=crocodile_(one_piece)           posts=430
              Franky  tag=franky_(one_piece)              posts=110
              Jinbei  tag=jinbe_(one_piece)               posts=166
            Kurohige  tag=marshall_d._teach               posts=42
                 Law  tag=trafalgar_law                   posts=951
               Luffy  tag=monkey_d._luffy                 posts=1263
              Mihawk  tag=dracule_mihawk                  posts=171
                Nami  tag=nami_(one_piece)                posts=3470
            Rayleigh  tag=silvers_rayleigh                posts=86
               Robin  tag=nico_robin                      posts=2290
               Sanji  tag=sanji_(one_piece)     

{'Ace': 579,
 'Akainu': 51,
 'Brook': 191,
 'Chopper': 150,
 'Crocodile': 430,
 'Franky': 110,
 'Jinbei': 166,
 'Kurohige': 42,
 'Law': 951,
 'Luffy': 1263,
 'Mihawk': 171,
 'Nami': 3470,
 'Rayleigh': 86,
 'Robin': 2290,
 'Sanji': 1043,
 'Shanks': 243,
 'Usopp': 113,
 'Zoro': 1253}

## 5) Enumerate post metadata and collect file URLs
We call `GET /posts.json` with `limit=200` and `page=1..N`.

Then we keep only posts whose `file_ext` is in `{'jpg','png'}` and that contain a usable URL (`file_url`).

In [24]:
def pages_needed(total_posts: int, per_page: int = POSTS_LIMIT_PER_PAGE) -> int:
    return int(math.ceil(total_posts / per_page)) if total_posts > 0 else 0

async def fetch_posts_page(
    session: aiohttp.ClientSession,
    query: ScrapeQuery,
    page: int,
    limit: int = POSTS_LIMIT_PER_PAGE,
) -> List[dict]:
    data = await danbooru_get_json(
        session,
        "/posts.json",
        params={"tags": query.tag_string(), "page": page, "limit": limit},
    )
    return data if isinstance(data, list) else []

def pick_download_url(post: dict) -> Optional[str]:
    # Prefer original file_url if present.
    # Danbooru may also provide large_file_url / preview_file_url, but we want originals.
    url = post.get("file_url")
    if not url:
        return None
    # file_url sometimes may be protocol-relative; normalize
    if url.startswith("//"):
        url = "https:" + url
    return url

async def collect_urls_for_tag(
    session: aiohttp.ClientSession,
    tag: str,
    total_posts: int,
    *,
    seen_md5: set[str],
    min_width: int = MIN_WIDTH,
    min_height: int = MIN_HEIGHT,
) -> Tuple[List[str], List[int]]:
    """
    Collect unique (by md5) download URLs for a given character tag.

    Deduplication:
      - Danbooru exposes an 'md5' field per post. If two posts point to the same file bytes,
        they share the same md5 even if their post IDs differ.
      - We keep a shared `seen_md5` set (optionally global across classes) and skip any post
        whose md5 we already collected.
    """
    q = ScrapeQuery(tag=tag)
    n_pages = pages_needed(total_posts)
    urls: List[str] = []
    post_ids: List[int] = []

    def ok_size(p: dict) -> bool:
        w = int(p.get("image_width") or 0)
        h = int(p.get("image_height") or 0)
        if min_width and w < min_width:
            return False
        if min_height and h < min_height:
            return False
        return True

    for page in range(1, n_pages + 1):
        posts = await fetch_posts_page(session, q, page=page)
        if not posts:
            continue

        for p in posts:
            ext = (p.get("file_ext") or "\\").lower()
            if ext not in ALLOWED_EXTS:
                continue

            if not ok_size(p):
                continue

            md5 = (p.get("md5") or "\\").lower()
            if md5:
                if md5 in seen_md5:
                    continue
                seen_md5.add(md5)

            url = pick_download_url(p)
            if not url:
                continue

            urls.append(url)
            if "id" in p:
                post_ids.append(int(p["id"]))

        if page % 5 == 0 or page == n_pages:
            print(f"  page {page}/{n_pages}  collected={len(urls)}  unique_md5={len(seen_md5)}")

    return urls, post_ids

async def collect_all_urls(characters: Dict[str, str], counts: Dict[str, int]) -> Dict[str, List[str]]:
    """
    Collect URLs for all classes.

    Dedup behavior:
      - A single `seen_md5` set is shared across ALL classes, so the same underlying file bytes
        won't be downloaded twice even if it appears under multiple posts/tags.
      - With chartags:1 enabled, cross-class duplicates should be rare; when they happen, they're
        often mistags or re-uploads, so global dedup is usually desirable.
    """
    timeout = aiohttp.ClientTimeout(total=None)
    connector = aiohttp.TCPConnector(limit=MAX_CONN)
    async with aiohttp.ClientSession(auth=AUTH, timeout=timeout, connector=connector) as session:
        out = {}
        seen_md5: set[str] = set()

        for cls, tag in characters.items():
            seen_md5 = set()
            total = counts.get(cls, 0)
            print(f"Collecting URLs for {cls} (tag={tag}, total_posts={total})")
            urls, _ = await collect_urls_for_tag(session, tag, total_posts=total, seen_md5=seen_md5)
            out[cls] = urls
            print(f"  -> {len(urls)} image URLs (global unique md5 so far: {len(seen_md5)})")
        return out

In [25]:
all_urls = asyncio.run(collect_all_urls(characters_dict, counts))
{k: len(v) for k, v in all_urls.items()}

Collecting URLs for Ace (tag=portgas_d._ace, total_posts=579)
  page 3/3  collected=524  unique_md5=525
  -> 524 image URLs (global unique md5 so far: 525)
Collecting URLs for Akainu (tag=sakazuki_(one_piece), total_posts=51)
  page 1/1  collected=49  unique_md5=49
  -> 49 image URLs (global unique md5 so far: 49)
Collecting URLs for Brook (tag=brook_(one_piece), total_posts=191)
  page 1/1  collected=180  unique_md5=180
  -> 180 image URLs (global unique md5 so far: 180)
Collecting URLs for Chopper (tag=tony_tony_chopper, total_posts=150)
  page 1/1  collected=132  unique_md5=133
  -> 132 image URLs (global unique md5 so far: 133)
Collecting URLs for Crocodile (tag=crocodile_(one_piece), total_posts=430)
  page 3/3  collected=393  unique_md5=394
  -> 393 image URLs (global unique md5 so far: 394)
Collecting URLs for Franky (tag=franky_(one_piece), total_posts=110)
  page 1/1  collected=97  unique_md5=98
  -> 97 image URLs (global unique md5 so far: 98)
Collecting URLs for Jinbei (tag=

{'Ace': 524,
 'Akainu': 49,
 'Brook': 180,
 'Chopper': 132,
 'Crocodile': 393,
 'Franky': 97,
 'Jinbei': 157,
 'Kurohige': 39,
 'Law': 909,
 'Luffy': 1156,
 'Mihawk': 160,
 'Nami': 3272,
 'Rayleigh': 79,
 'Robin': 2130,
 'Sanji': 980,
 'Shanks': 228,
 'Usopp': 99,
 'Zoro': 1188}

## 6) Download images (jpg + png)
Downloads are streamed to disk. We keep concurrency bounded with a semaphore.
If `SKIP_IF_EXISTS=True`, re-running the notebook won't re-download existing files.

In [26]:
dl_semaphore = asyncio.Semaphore(MAX_DL_CONCURRENCY)

def safe_filename_from_url(url: str) -> str:
    # Use original basename, but guard against weird query strings
    base = Path(url.split("?")[0]).name
    base = re.sub(r"[^A-Za-z0-9._-]+", "_", base)
    if not base:
        # fallback to hash
        base = hashlib.sha1(url.encode("utf-8")).hexdigest()
    return base

async def download_one(session: aiohttp.ClientSession, url: str, out_dir: Path) -> bool:
    name = safe_filename_from_url(url)
    out_path = out_dir / name
    if SKIP_IF_EXISTS and out_path.exists():
        return True

    tmp_path = out_path.with_suffix(out_path.suffix + ".part")

    for attempt in range(MAX_RETRIES):
        await rate_limiter.wait()
        try:
            async with dl_semaphore:
                async with session.get(url, timeout=REQUEST_TIMEOUT_S) as resp:
                    if resp.status == 429:
                        retry_after = resp.headers.get("Retry-After")
                        delay = float(retry_after) if retry_after else _sleep_backoff(attempt, base=2.0)
                        print(f"429 throttled. Sleeping {delay:.1f}s for download {url}")
                        await asyncio.sleep(delay)
                        continue
                    if resp.status != 200:
                        delay = _sleep_backoff(attempt, base=1.5)
                        print(f"HTTP {resp.status} downloading {url}. Retry in {delay:.1f}s")
                        await asyncio.sleep(delay)
                        continue

                    with open(tmp_path, "wb") as f:
                        async for chunk in resp.content.iter_chunked(1 << 15):
                            f.write(chunk)

            tmp_path.replace(out_path)
            return True

        except Exception as e:
            delay = _sleep_backoff(attempt, base=1.5)
            print(f"Error {e} downloading {url}. Retry in {delay:.1f}s")
            await asyncio.sleep(delay)

    # cleanup temp
    if tmp_path.exists():
        tmp_path.unlink(missing_ok=True)
    return False

async def download_for_class(cls: str, urls: List[str]) -> Tuple[int, int]:
    out_dir = OUT_TRAIN_DIR / cls
    out_dir.mkdir(parents=True, exist_ok=True)

    timeout = aiohttp.ClientTimeout(total=None)
    connector = aiohttp.TCPConnector(limit=MAX_CONN)
    async with aiohttp.ClientSession(auth=AUTH, timeout=timeout, connector=connector) as session:
        ok = 0
        bad = 0
        # Create tasks in moderate chunks (avoid creating 10k tasks at once)
        chunk_size = 200
        for i in range(0, len(urls), chunk_size):
            chunk = urls[i:i+chunk_size]
            results = await asyncio.gather(*(download_one(session, u, out_dir) for u in chunk))
            ok += sum(bool(r) for r in results)
            bad += sum(not bool(r) for r in results)
            print(f"{cls}: {min(i+chunk_size, len(urls))}/{len(urls)}  ok={ok} bad={bad}")
        return ok, bad

async def download_all(all_urls: Dict[str, List[str]]) -> Dict[str, Tuple[int,int]]:
    results = {}
    for cls, urls in all_urls.items():
        print(f"Downloading {cls}: {len(urls)} files...")
        ok, bad = await download_for_class(cls, urls)
        results[cls] = (ok, bad)
    return results

In [27]:
start = time.perf_counter()
dl_stats = asyncio.run(download_all(all_urls))
elapsed = time.perf_counter() - start

print(f"Done in {elapsed:.1f}s")
dl_stats

Downloading Ace: 524 files...
Ace: 200/524  ok=200 bad=0
Ace: 400/524  ok=400 bad=0
Error  downloading https://cdn.donmai.us/original/9d/0e/9d0e4222462a19fbffe8126912cbc766.jpg. Retry in 1.5s
Error  downloading https://cdn.donmai.us/original/13/25/132563f2a20ce113f0fa657e37b55df4.jpg. Retry in 1.5s
Error  downloading https://cdn.donmai.us/original/cd/71/cd7176f62f50fb9fb6537503d2372c5c.png. Retry in 1.5s
Ace: 524/524  ok=524 bad=0
Downloading Akainu: 49 files...
Akainu: 49/49  ok=49 bad=0
Downloading Brook: 180 files...
Error  downloading https://cdn.donmai.us/original/0f/94/0f94b4d165c5b233def1b35e179b80dc.jpg. Retry in 1.5s
Brook: 180/180  ok=180 bad=0
Downloading Chopper: 132 files...
Chopper: 132/132  ok=132 bad=0
Downloading Crocodile: 393 files...
Crocodile: 200/393  ok=200 bad=0
Error  downloading https://cdn.donmai.us/original/d7/22/d72261ed284f478d0655ef66afe3ae27.jpg. Retry in 1.5s
Error  downloading https://cdn.donmai.us/original/9f/30/9f300f62dd5afb043f477e81875504ff.jpg. R

{'Ace': (524, 0),
 'Akainu': (49, 0),
 'Brook': (180, 0),
 'Chopper': (132, 0),
 'Crocodile': (393, 0),
 'Franky': (97, 0),
 'Jinbei': (157, 0),
 'Kurohige': (39, 0),
 'Law': (909, 0),
 'Luffy': (1156, 0),
 'Mihawk': (160, 0),
 'Nami': (3272, 0),
 'Rayleigh': (79, 0),
 'Robin': (2130, 0),
 'Sanji': (980, 0),
 'Shanks': (228, 0),
 'Usopp': (99, 0),
 'Zoro': (1188, 0)}

## 7) Validate and delete corrupted images (optional)
PIL's `verify()` can catch truncated/invalid files.

In [28]:
def find_corrupted(root: Path) -> List[Path]:
    bad = []
    for p in root.rglob("*"):
        if not p.is_file():
            continue
        try:
            with Image.open(p) as img:
                img.verify()
        except Exception:
            bad.append(p)
    return bad

In [31]:
bad_files = find_corrupted(OUT_TRAIN_DIR)
print("Corrupted:", len(bad_files))
bad_files[:10]

Corrupted: 0


[]

In [32]:
# Delete corrupted files
for p in bad_files:
    p.unlink(missing_ok=True)
print("Deleted", len(bad_files), "files")

Deleted 0 files


## 8) Final counts per class

In [34]:
counts_after = {}
for cls in characters_list:
    counts_after[cls] = sum(1 for _ in (OUT_TRAIN_DIR/cls).glob("*") if _.is_file())

counts_after

{'Ace': 524,
 'Akainu': 49,
 'Brook': 180,
 'Chopper': 132,
 'Crocodile': 393,
 'Franky': 97,
 'Jinbei': 157,
 'Kurohige': 39,
 'Law': 909,
 'Luffy': 1156,
 'Mihawk': 160,
 'Nami': 3272,
 'Rayleigh': 79,
 'Robin': 2130,
 'Sanji': 980,
 'Shanks': 228,
 'Usopp': 99,
 'Zoro': 1188}