![GitHub followers](https://img.shields.io/github/followers/duskfallcrew)  ![GitHub Sponsors](https://img.shields.io/github/sponsors/duskfallcrew) ![Static Badge](https://img.shields.io/badge/https%3A%2F%2Fgithub.com%2Fduskfallcrew%2Ffancaps-scraper%2F?style=flat-square&logo=github)

Follow our discord: 
[Discord](https://discord.gg/5t2kYxt7An)

# FanCaps-Scrapper

A Jupyter notebook (not for colab) based on the Python CLI scrapper for anime screenshots & Fan Screen on https://fancaps.net.

You will need an Ngork account for this, and this will NOT work on colab because of this. 

README template from https://www.makeareadme.com/

This was forked from [Fannovel](https://github.com/Fannovel16/fancaps-scraper) , As well as the most recent fork for NodeJS: [JSarvise](https://github.com/JSarvise/fancaps-scraper)

## About This Notebook

Based on Fannovel's original Node JS application for mass downloading caps from FanCaps.
This aims to try and make it as simple as possible, this one contains everything in house, and will continue to build on the original colab Fannovel developed.


# Install Requirements

In [None]:
import subprocess

# Function to install requirements from requirements.txt
def install_requirements():
    print("Installing requirements from requirements.txt...")
    try:
        subprocess.run(["pip", "install", "-r", "fancaps-scraper/jupyter-notebook/requirements.txt"], check=True)
        print("Requirements installed successfully.")
    except subprocess.CalledProcessError as e:
        print(f"Error installing requirements: {e.stderr}")

# Markdown cell for explanation
markdown_text = """
### Setting Up Environment

To ensure all dependencies are met, we will proceed with installing the required packages 
from the `requirements.txt` file.

#### Cloning Repositories

Before we begin, let's clone the following repositories:
- [fancaps-scraper](https://github.com/duskfallcrew/fancaps-scraper/)
- [cafe-aesthetic-scorer](https://github.com/duskfallcrew/cafe-aesthetic-scorer)
- [kohya-trainer](https://github.com/duskfallcrew/kohya-trainer)

These repositories contain necessary components for running the notebook.

"""

print(markdown_text)

# Clone the fancaps-scraper repository (if not already cloned)
repo_urls = [
    'https://github.com/duskfallcrew/fancaps-scraper/',
    'https://github.com/duskfallcrew/cafe-aesthetic-scorer',
    'https://github.com/duskfallcrew/kohya-trainer'
]

for repo_url in repo_urls:
    clone_command = ['git', 'clone', repo_url]

    # Execute the clone command using subprocess
    try:
        result = subprocess.run(clone_command, capture_output=True, text=True, check=True)
        print(result.stdout)
        print(f"Repository {repo_url} cloned successfully.")
    except subprocess.CalledProcessError as e:
        print(f"Error cloning repository {repo_url}: {e.stderr}")

# Install requirements using the function
install_requirements()


# Scraper CLI Wrapper with Ipython Widgets

In [1]:
import asyncio
import os
import logging
from urllib.parse import urlparse, urljoin
import aiohttp
from bs4 import BeautifulSoup
import aiofiles
import ipywidgets as widgets
from IPython.display import display

# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Function to fetch HTML content using aiohttp
async def fetch_html(session, url):
    try:
        async with session.get(url) as response:
            return await response.text()
    except aiohttp.ClientError as e:
        logging.error(f"Error fetching HTML from {url}: {str(e)}")
        return None

# Function to download an image asynchronously
async def download_image(session, image_url, save_dir):
    try:
        async with session.get(image_url) as response:
            if response.status == 200:
                image_data = await response.read()
                filename = os.path.basename(urlparse(image_url).path)
                save_path = os.path.join(save_dir, filename)
                async with aiofiles.open(save_path, 'wb') as f:
                    await f.write(image_data)
                logging.info(f"Downloaded: {image_url}")
            else:
                logging.error(f"Failed to download {image_url}. Status: {response.status}")
    except aiohttp.ClientError as e:
        logging.error(f"Error downloading {image_url}: {str(e)}")

# Function to download images asynchronously using aria2c
async def download_images_async(image_urls, save_dir):
    tasks = []
    for image_url in image_urls:
        tasks.append(download_image_async(image_url, save_dir))
    await asyncio.gather(*tasks)

async def download_image_async(image_url, save_dir):
    try:
        aria2c_command = f"aria2c -x 16 -s 16 -d {save_dir} {image_url}"
        process = await asyncio.create_subprocess_shell(aria2c_command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
        stdout, stderr = await process.communicate()

        if process.returncode == 0:
            logging.info(f"Downloaded: {image_url}")
        else:
            logging.error(f"Failed to download {image_url}. Error: {stderr.decode().strip()}")
    except asyncio.subprocess.SubprocessError as e:
        logging.error(f"Error running aria2c for {image_url}: {str(e)}")

# Function to validate the save directory
def validate_save_dir(save_dir):
    if not os.path.isabs(save_dir):
        raise ValueError("Save directory path must be absolute")
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)
    if not os.access(save_dir, os.W_OK):
        raise ValueError(f"Save directory {save_dir} is not writable")

# Function to get anime data from the provided URL
async def get_anime_data(anime_url, download_type):
    try:
        anime_url = urlparse(anime_url)
        episodes = []
        page_i = 0
        series_title = None
        async with aiohttp.ClientSession() as session:
            while True:
                page_i += 1
                anime_url = anime_url._replace(query=f'page={page_i}')
                html_content = await fetch_html(session, anime_url.geturl())
                if not html_content:
                    break
                soup = BeautifulSoup(html_content, 'html.parser')

                if download_type == 'episodes':
                    curr_episodes = soup.select("h3 > a[href*='/tv/episodeimages.php?']")
                else:  # download_type == 'whole_series'
                    curr_episodes = soup.select("a[style='color:black;']")

                if not curr_episodes:
                    break

                for episode in curr_episodes:
                    episode_title = episode.text.strip()
                    episode_url = urljoin(anime_url.geturl(), episode['href'])
                    episodes.append({'episodeTitle': episode_title, 'episodeUrl': episode_url})

                if not series_title:
                    series_title = soup.select_one("h1.post_title").text.replace(': ', ' - ')

        return {'seriesTitle': series_title, 'episodes': episodes}
    except Exception as e:
        logging.error(f"Error fetching anime data from {anime_url}: {str(e)}")
        return None

# Function to get TV show data from the provided URL
async def get_tv_show_data(tv_show_url, download_type):
    try:
        tv_show_url = urlparse(tv_show_url)
        episodes = []
        page_i = 0
        series_title = None
        async with aiohttp.ClientSession() as session:
            while True:
                page_i += 1
                tv_show_url = tv_show_url._replace(query=f'page={page_i}')
                html_content = await fetch_html(session, tv_show_url.geturl())
                if not html_content:
                    break
                soup = BeautifulSoup(html_content, 'html.parser')

                if download_type == 'episodes':
                    curr_episodes = soup.select("h3 > a[href*='/tv/episodeimages.php?']")
                else:  # download_type == 'whole_series'
                    curr_episodes = soup.select("a[style='color:black;']")

                if not curr_episodes:
                    break

                for episode in curr_episodes:
                    episode_title = episode.text.strip()
                    episode_url = urljoin(tv_show_url.geturl(), episode['href'])
                    episodes.append({'episodeTitle': episode_title, 'episodeUrl': episode_url})

                if not series_title:
                    series_title = soup.select_one("h1.post_title").text.replace(': ', ' - ')

        return {'seriesTitle': series_title, 'episodes': episodes}
    except Exception as e:
        logging.error(f"Error fetching TV show data from {tv_show_url}: {str(e)}")
        return None

# Function to get movie data from the provided URL
async def get_movie_data(movie_url):
    try:
        movie_url = urlparse(movie_url)
        async with aiohttp.ClientSession() as session:
            html_content = await fetch_html(session, movie_url.geturl())
            if html_content:
                soup = BeautifulSoup(html_content, 'html.parser')
                # Implement movie data scraping logic here
                return {'movieData': 'Placeholder for movie data'}  # Replace with actual movie data structure
            else:
                logging.error(f"Failed to fetch HTML content from {movie_url}")
                return None
    except Exception as e:
        logging.error(f"Error fetching movie data from {movie_url}: {str(e)}")
        return None

# Main function to handle argument parsing and invoke appropriate functions
async def main(selection, download_type, url, save_dir, num_of_promises=75, skip_n_last_pages=2, disable_progress_bar=False):
    try:
        # Validate save directory
        validate_save_dir(save_dir)

        # Determine data retrieval function based on selection
        if selection == 'Anime':
            data = await get_anime_data(url, download_type)
        elif selection == 'TV Series':
            data = await get_tv_show_data(url, download_type)
        elif selection == 'Movie':
            data = await get_movie_data(url)
        else:
            logging.error(f"Invalid selection: {selection}")
            return

        if data:
            # Downloading images or processing movie data
            if 'episodes' in data:
                await download_images_async([episode['episodeUrl'] for episode in data['episodes']], save_dir)
            elif 'imageUrls' in data:
                await download_images_async(data['imageUrls'], save_dir)
            elif 'movieData' in data:
                logging.info(f"Movie data fetched: {data['movieData']}")
        else:
            logging.error(f"Failed to retrieve data from {url}")
    except ValueError as ve:
        logging.error(f"Error in save directory: {str(ve)}")
    except Exception as e:
        logging.error(f"Error in main function: {str(e)}")

# Define and display IPython widgets for user input
selection_input = widgets.Dropdown(options=['Anime', 'TV Series', 'Movie'], description='Selection:')
download_type_input = widgets.Dropdown(options=['episodes', 'whole_series'], description='Download Type:')
url_input = widgets.Text(description='URL:')
save_dir_input = widgets.Text(description='Save Directory:')
num_of_promises_input = widgets.IntSlider(description='Number of Promises:', min=1, max=100, value=75)
skip_n_last_pages_input = widgets.IntSlider(description='Skip Last Pages:', min=0, max=10, value=2)
disable_progress_bar_input = widgets.Checkbox(description='Disable Progress Bar')

def update_download_type(*args):
    if selection_input.value == 'Movie':
        download_type_input.disabled = True
    else:
        download_type_input.disabled = False

selection_input.observe(update_download_type, 'value')

display(selection_input, download_type_input, url_input, save_dir_input, num_of_promises_input, skip_n_last_pages_input, disable_progress_bar_input)

# Run main function on button click
button = widgets.Button(description='Run Scraper')

async def on_button_clicked(b):
    await main(selection_input.value, download_type_input.value, url_input.value, save_dir_input.value, num_of_promises_input.value, skip_n_last_pages_input.value, disable_progress_bar_input.value)

button.on_click(on_button_clicked)
display(button)


Text(value='', description='URL:')

Text(value='', description='Save Directory:')

IntSlider(value=75, description='Number of Promises:', min=1)

IntSlider(value=2, description='Skip Last Pages:', max=10)

Checkbox(value=False, description='Disable Progress Bar')

Button(description='Run Scraper', style=ButtonStyle())

# FiftyOneAI: Check for Duplicates

To use Ngrok for tunneling in this script, you'll need an Ngrok authentication token. Follow these steps to obtain your Ngrok token:

- Sign up for a free Ngrok account at Ngrok's website.
- After signing up, log in to your Ngrok account.
- Navigate to the Ngrok Auth page. Here, you'll find your authentication token.
- Copy the token and paste it into the appropriate field in the script where prompted to integrate Ngrok functionality.
- Save your changes and run the script. Ngrok will handle the tunneling through a randomly generated URL, allowing you to access your application remotely.

Note: I'll add links later on.

In [None]:
import os
import subprocess
import ipywidgets as widgets
from IPython.display import display, clear_output
import numpy as np
import fiftyone as fo
import fiftyone.zoo as foz
from fiftyone import ViewField as F
from sklearn.metrics.pairwise import cosine_similarity
from pyngrok import ngrok

# Function to start ngrok tunnel with provided token
def start_ngrok(token):
    try:
        ngrok.set_auth_token(token)
        public_url = ngrok.connect(5151)  # Replace 5151 with your app's port
        print(f"Ngrok Tunnel URL: {public_url}")
        return public_url
    except Exception as e:
        print(f"Error starting ngrok tunnel: {e}")
        return None

# Constants and configuration
similarity_threshold = 0.985  # Adjust as needed
model_name = "clip-vit-base32-torch"  # Model for embeddings
supported_types = (".png", ".jpg", ".jpeg")  # Supported image types
images_folder = "/path/to/your/images/folder"  # Replace with your actual path
project_subfolder = "fiftyone_project"

# Step 1: Check dependencies and set up environment
if "step1_installed_flag" not in globals():
    raise Exception("Please run step 1 first!")  # Ensure dependencies are set up

# Function to analyze dataset and remove duplicates
def analyze_and_remove_duplicates(images_folder, project_subfolder, ngrok_token):
    try:
        os.chdir(images_folder)  # Change directory to your images folder
        img_count = len(os.listdir(images_folder))
        
        if img_count == 0:
            print(f"💥 Error: No images found in {images_folder}")
            return
        
        print("\n💿 Analyzing dataset...\n")
        dataset = fo.Dataset.from_dir(images_folder, dataset_type=fo.types.ImageDirectory)

        # Step 2: Compute embeddings for images
        model = foz.load_zoo_model(model_name)
        embeddings = dataset.compute_embeddings(model)

        batch_size = min(250, img_count)
        batch_embeddings = np.array_split(embeddings, batch_size)
        similarity_matrices = []
        max_size_x = max(array.shape[0] for array in batch_embeddings)
        max_size_y = max(array.shape[1] for array in batch_embeddings)

        for i, batch_embedding in enumerate(batch_embeddings):
            similarity = cosine_similarity(batch_embedding)
            padded_array = np.zeros((max_size_x, max_size_y))
            padded_array[0:similarity.shape[0], 0:similarity.shape[1]] = similarity
            similarity_matrices.append(padded_array)

        similarity_matrix = np.concatenate(similarity_matrices, axis=0)
        similarity_matrix = similarity_matrix[0:embeddings.shape[0], 0:embeddings.shape[0]]

        similarity_matrix = cosine_similarity(embeddings)
        similarity_matrix -= np.identity(len(similarity_matrix))

        # Step 3: Calculate similarity matrix and mark duplicates
        dataset.match(F("max_similarity") > similarity_threshold)
        dataset.tags = ["delete", "has_duplicates"]

        id_map = [s.id for s in dataset.select_fields(["id"])]
        samples_to_remove = set()
        samples_to_keep = set()

        for idx, sample in enumerate(dataset):
            if sample.id not in samples_to_remove:
                # Keep the first instance of two duplicates
                samples_to_keep.add(sample.id)

                dup_idxs = np.where(similarity_matrix[idx] > similarity_threshold)[0]
                for dup in dup_idxs:
                    # Remove all other duplicates
                    samples_to_remove.add(id_map[dup])

                if len(dup_idxs) > 0:
                    sample.tags.append("has_duplicates")
                    sample.save()
            else:
                sample.tags.append("delete")
                sample.save()

        # Step 4: Launch FiftyOne app with Ngrok tunnel
        ngrok_url = start_ngrok(ngrok_token)
        if ngrok_url:
            try:
                # Launch the FiftyOne app with the Ngrok URL
                fo.launch_app(dataset, port=5151, url=ngrok_url)

                # Wait for user input to save changes
                input("⭕ When you're done, enter something here to save your changes: ")

                # Step 5: Remove marked samples and save changes
                marked = [s for s in dataset if "delete" in s.tags]
                dataset.remove_samples(marked)
                dataset.export(export_dir=os.path.join(images_folder, project_subfolder), dataset_type=fo.types.ImageDirectory)

                # Clean up temporary folders
                temp_suffix = "_temp"
                os.rename(images_folder, images_folder + temp_suffix)
                os.rename(images_folder + temp_suffix + "/" + project_subfolder, images_folder)
                os.rmdir(images_folder + temp_suffix)

                print(f"\n✅ Removed {len(marked)} images from dataset.")
                print(f"You now have {len(os.listdir(images_folder))} images remaining.")

            finally:
                # Disconnect Ngrok tunnel when done
                ngrok.disconnect(ngrok_url)
                print("Ngrok tunnel disconnected")

    except Exception as e:
        print(f"Error in analyzing and removing duplicates: {e}")

# Ngrok token input widget
ngrok_token_input = widgets.Text(description='Ngrok Auth Token:', placeholder='Enter your ngrok auth token')

# Function to handle ngrok token input
def on_ngrok_token_submit(b):
    clear_output()
    display(ngrok_token_input)
    analyze_and_remove_duplicates(images_folder, project_subfolder, ngrok_token_input.value)

# Display ngrok token input widget
display(ngrok_token_input)

# Button to submit ngrok token
submit_button = widgets.Button(description='Submit')
submit_button.on_click(on_ngrok_token_submit)
display(submit_button)


# Tagging Dataset (Under Construction - WIP xD need more time)

In [None]:
import os
import ipywidgets as widgets
from IPython.display import display, clear_output

# Function to handle tagging based on user inputs
def tag_images(method, tag_threshold, blacklist_tags, caption_min, caption_max, selected_tagger):
    if method == "Anime tags":
        if "step4a_installed_flag" not in globals():
            print("\n🏭 Installing dependencies for Anime tags...\n")
            !pip install accelerate==0.15.0 diffusers[torch]==0.10.2 einops==0.6.0 tensorflow transformers safetensors huggingface-hub torchvision albumentations jax==0.4.23 jaxlib==0.4.23
            if not get_ipython().__dict__['user_ns']['_exit_code']:
                clear_output()
                step4a_installed_flag = True
            else:
                print("❌ Error installing dependencies, trying to continue anyway...")

        print("\n🚶‍♂️ Launching program for Anime tags...\n")

        # Adjust paths and commands as needed
        kohya = "/content/kohya-trainer"
        os.environ['PYTHONPATH'] = kohya
        !python {kohya}/finetune/tag_images_by_wd14_tagger.py \
            {images_folder} \
            --repo_id={selected_tagger} \
            --model_dir={root_dir} \
            --thresh={tag_threshold} \
            --batch_size=8 \
            --caption_extension=.txt \
            --force_download

        if not get_ipython().__dict__['user_ns']['_exit_code']:
            print("Removing underscores and blacklist...")
            blacklisted_tags = [t.strip() for t in blacklist_tags.split(",")]
            from collections import Counter
            top_tags = Counter()
            for txt in [f for f in os.listdir(images_folder) if f.lower().endswith(".txt")]:
                with open(os.path.join(images_folder, txt), 'r') as f:
                    tags = [t.strip() for t in f.read().split(",")]
                    tags = [t.replace("_", " ") if len(t) > 3 else t for t in tags]
                    tags = [t for t in tags if t not in blacklisted_tags]
                top_tags.update(tags)
                with open(os.path.join(images_folder, txt), 'w') as f:
                    f.write(", ".join(tags))

            os.environ['PYTHONPATH'] = '/env/python'
            clear_output()
            print(f"📊 Tagging complete. Here are the top 50 tags in your dataset:")
            print("\n".join(f"{k} ({v})" for k, v in top_tags.most_common(50)))

    elif method == "Photo captions":
        if "step4b_installed_flag" not in globals():
            print("\n🏭 Installing dependencies for Photo captions...\n")
            !pip install timm==0.6.12 fairscale==0.4.13 transformers==4.26.0 requests==2.28.2 accelerate==0.15.0 diffusers[torch]==0.10.2 einops==0.6.0 safetensors==0.2.6 jax==0.4.23 jaxlib==0.4.23
            if not get_ipython().__dict__['user_ns']['_exit_code']:
                clear_output()
                step4b_installed_flag = True
            else:
                print("❌ Error installing dependencies, trying to continue anyway...")

        print("\n🚶‍♂️ Launching program for Photo captions...\n")

        # Adjust paths and commands as needed
        kohya = "/content/kohya-trainer"
        os.environ['PYTHONPATH'] = kohya
        !python {kohya}/finetune/make_captions.py \
            {images_folder} \
            --beam_search \
            --max_data_loader_n_workers=2 \
            --batch_size=8 \
            --min_length={caption_min} \
            --max_length={caption_max} \
            --caption_extension=.txt

        if not get_ipython().__dict__['user_ns']['_exit_code']:
            import random
            captions = [f for f in os.listdir(images_folder) if f.lower().endswith(".txt")]
            sample = []
            for txt in random.sample(captions, min(10, len(captions))):
                with open(os.path.join(images_folder, txt), 'r') as f:
                    sample.append(f.read())

            os.chdir(root_dir)
            os.environ['PYTHONPATH'] = '/env/python'
            clear_output()
            print(f"📊 Captioning complete. Here are {len(sample)} example captions from your dataset:")
            print("".join(sample))

# IPython widgets for user interaction
method_dropdown = widgets.Dropdown(
    options=["Anime tags", "Photo captions"],
    description='Tagging Method:'
)

tag_threshold_slider = widgets.FloatSlider(
    value=0.35,
    min=0.0,
    max=1.0,
    step=0.01,
    description='Tag Threshold:'
)

blacklist_tags_text = widgets.Text(
    placeholder='Enter comma-separated tags',
    description='Blacklist Tags:'
)

caption_min_text = widgets.IntText(
    value=10,
    description='Caption Min Length:'
)

caption_max_text = widgets.IntText(
    value=75,
    description='Caption Max Length:'
)

tagger_dropdown = widgets.Dropdown(
    options=["SmilingWolf/wd-v1-4-swinv2-tagger-v2", "Other taggers..."],
    description='Tagger Selection:'
)

run_button = widgets.Button(
    description='Run Tagging',
    button_style='success',
    tooltip='Click to run tagging process'
)

def on_run_button_clicked(b):
    clear_output()
    display(method_dropdown, tag_threshold_slider, blacklist_tags_text, caption_min_text, caption_max_text, tagger_dropdown, run_button)
    tag_images(method_dropdown.value, tag_threshold_slider.value, blacklist_tags_text.value, caption_min_text.value, caption_max_text.value, tagger_dropdown.value)

run_button.on_click(on_run_button_clicked)

# Display widgets
display(method_dropdown, tag_threshold_slider, blacklist_tags_text, caption_min_text, caption_max_text, tagger_dropdown, run_button)

# Be aware this isn't finished, and is LOVINGLY BASED ON HOLOSTRAWBERRY'S colab!
