In [17]:
from datasets import load_dataset
import time
import fal_client
from dotenv import load_dotenv
import requests
import concurrent

In [18]:
async def subscribe(urls):
    def on_queue_update(update):
        if isinstance(update, fal_client.InProgress):
            if update.logs:
                for log in update.logs:
                    print(log["message"])

    result = await fal_client.subscribe_async(
        "fal-ai/moondream/batched",
        arguments={
            "inputs": [
            {
                "prompt": "Caption this image.",
                "image_url": url
            } for url in urls
            ]
            
        },
        on_queue_update=on_queue_update,
    )

    print(result)

def is_valid_url(url):
    try:
        response = requests.head(url, allow_redirects=True)
        # Check if the status code indicates success
        if response.status_code == 200:
            # Check if the content-type indicates an image
            content_type = response.headers.get('content-type')
            if 'image' in content_type:
                return True
        return False
    except requests.RequestException:
        return False

def clean_urls(batch, max_workers=16):
    """
    Since there may be link rot, this function confirms the links are still valid.
    If the link is valid, the URL is returned in the dictionary, otherwise it is returned in the list.

    :param batch: A list of dictionaries containing the key and URL of the image.
    :param max_workers: The maximum number of threads to use for parallel checking.
    :return: A dictionary containing the valid URLs and a list containing the invalid URLs.
    """
    valid_urls = {}
    invalid_urls = []

    keys = batch["key"]
    urls = batch["downloadurl"]

    def check_url(key, url):
        """Helper function to check a URL and return key, validity status."""
        if is_valid_url(url):
            return key, url, True
        return key, url, False

    # Use ThreadPoolExecutor to parallelize URL checks, specify the number of workers with max_workers
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [executor.submit(check_url, key, url) for key, url in zip(keys, urls)]
        for future in concurrent.futures.as_completed(futures):
            key, url, is_valid = future.result()
            if is_valid:
                valid_urls[key] = url
            else:
                invalid_urls.append(key)

    return valid_urls, invalid_urls

In [3]:
load_dotenv()

True

In [4]:
dataset = load_dataset("common-canvas/commoncatalog-cc-by", split="train", streaming=True, columns=["key", "downloadurl"])


Resolving data files:   0%|          | 0/5573 [00:00<?, ?it/s]

In [15]:
batched_dataset = dataset.batch(batch_size=64)

In [19]:
captions = {}
link_rot_urls = []

for batch in batched_dataset:
    t = time.time()

    valid_urls, invalid_urls = clean_urls(batch)
    link_rot_urls.extend(invalid_urls)

    print(f"Took {time.time() - t} seconds to clean {len(invalid_urls)} URLs.")
    t = time.time()

    print(f"Processing {len(valid_urls)} valid URLs...")

    res = await subscribe(valid_urls.values())

    print("Took", time.time() - t)
    break

Took 1.638000726699829 seconds to clean 2 URLs.
Processing 62 valid URLs...
{'outputs': ['A white circular plaque with a black and white emblem of a crown and a building with a dome, inscribed "Jubilee Walkway" in bold letters, is affixed to a gray concrete surface.', 'Two men in gray uniforms with black hats and white gloves walk side by side, carrying rifles and a white bag, with a crowd of people in the background.', 'A sand mandala, featuring a central star and four smaller circles, is arranged in a circular pattern against a sandy background.', 'A child in a black jacket and hat sits on a rocky beach, gazing out at the ocean under a cloudy sky.', 'Three individuals ride Segways on a concrete pier, with a large ship and city skyline in the background.', 'A bustling market scene features a variety of colorful fruits and vegetables on display under striped awnings, with people browsing and shopping amidst the lively atmosphere.', 'A black dog statue stands on a weather vane atop a wh

In [21]:
import json
with open("../captions.json", "r") as f:
    captions = json.load(f)
    print(len(captions))

987
