### Setup

In [1]:
from concurrent.futures import ThreadPoolExecutor
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# from tqdm import tqdm
import logging
import requests
import json
import time
import os

In [2]:
SF_API_URL = 'https://api.scryfall.com'
SF_API_HEADERS = {
    'User_Agent' : 'card-classifier/0.1',
    'Accept' : '*/*'
}
SF_API_DELAY = 50 # ms - No ratelimit on *.scryfall.io

In [3]:
DATA_UPDATE = True

In [4]:
# Create a session
session = requests.Session()

# Configure the HTTPAdapter with a connection pool size
adapter = HTTPAdapter(
    pool_connections=10,  # Number of connection pools
    pool_maxsize=50,      # Maximum number of connections in the pool
    max_retries=Retry(total=3, backoff_factor=0.3)  # Retry strategy
)

# Mount the adapter to the session
session.mount('http://', adapter)
session.mount('https://', adapter)

Logging setup

In [5]:
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

### Wipe data folder

In [6]:

import shutil
import os

data_folder = 'data'
art_folder = os.path.join(data_folder, 'art')
gitkeep_file = os.path.join(data_folder, '.gitkeep')
art_gitkeep_file = os.path.join(art_folder, '.gitkeep')

def clear_folder(folder, gitkeep_path):
    if os.path.exists(folder):
        for item in os.listdir(folder):
            item_path = os.path.join(folder, item)
            if item_path != gitkeep_path:
                if os.path.isfile(item_path) or os.path.islink(item_path):
                    os.unlink(item_path)
                elif os.path.isdir(item_path):
                    shutil.rmtree(item_path)
    else:
        os.makedirs(folder)
    # Ensure .gitkeep exists
    with open(gitkeep_path, 'w') as file:
        file.write('')

if DATA_UPDATE:
    clear_folder(data_folder, gitkeep_file)
    print(f"{data_folder} folder cleared")

    clear_folder(art_folder, art_gitkeep_file)
    print(f"{art_folder} folder cleared")

data folder cleared
data\art folder cleared


### Helper Functions

In [7]:
### Pull all download URIs
def get_uris():
    response = session.get(SF_API_URL+'/bulk-data', headers=SF_API_HEADERS)

    data = response.json()['data']
    # print(json.dumps(data, indent=4))

    # Extract all URI
    download_uri = {}
    for object in data:
        download_uri[object['type']] = object['download_uri']
    return download_uri

In [8]:
### Clean card names
def clean_cardname(name):
    return name.replace(' ', '_').replace('/', '_').replace(':', '_').replace('\'', '').replace('\"', '').replace('?', '').replace('!', '')

### Download Card Files

In [9]:
if DATA_UPDATE:
    download_uri = get_uris()

    # Download and save the unique artwork file
    artwork_file_path = os.path.join('data', 'unique_artwork.json')
    response = session.get(download_uri['unique_artwork'], headers=SF_API_HEADERS)
    with open(artwork_file_path, 'wb') as file:
        file.write(response.content)

    # Download and save the all cards
    # all_cards_file_path = os.path.join('data', 'all_cards.json')
    # response = session.get(download_uri['all_cards'], headers=SF_API_HEADERS)
    # with open(all_cards_file_path, 'wb') as file:
    #     file.write(response.content)



### Download Card Images

In [10]:
with open(artwork_file_path, 'r', encoding="utf8") as file:
    card_data = json.load(file)

# with open(all_cards_file_path, 'r', encoding="utf8") as file:
#     card_data = json.load(file)

In [11]:
def download_image(entry, is_dfc=False, card_face=None):
    if not is_dfc:
        filename = f"{clean_cardname(entry['name'])}_{entry['illustration_id']}.jpg"
        art_path = os.path.join('data\\art', filename)
        image_url = entry['image_uris']['art_crop']
    else:
        filename = f"{clean_cardname(card_face['name'])}_{card_face['illustration_id']}.jpg"
        art_path = os.path.join('data\\art', filename)
        image_url = card_face['image_uris']['art_crop']

    response = session.get(image_url, headers=SF_API_HEADERS)
    if response.status_code != 200:
        logging.error(f"Failed to download image for {entry['name']}: {response.status_code}")
        return
    with open(art_path, 'wb') as file:
        file.write(response.content)

if DATA_UPDATE:
    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = []
        for entry in card_data:
            # If not DFC
            if 'image_uris' and 'illustration_id' in entry:
                future = executor.submit(download_image, entry)
                futures.append((future, entry))
            # If DFC
            elif 'card_faces' in entry:
                for card_face in entry['card_faces']:
                    future = executor.submit(download_image, entry, True, card_face)
                    futures.append((future, entry))
            # Likely no image, includes cards like dungeons
            else:
                logging.error(f"Unknown card type for {entry['name']}")
                continue

        print("Downloads Queued, waiting for completion...")
        for future, entry in futures:
            # Debugging for missing printing. Only one for Inalla, should be two
            if entry['name'] == 'Inalla, Archmage Ritualist':
                logging.info(f"Inalla, Archmage Ritualist found at {time.time()} with illustration id {entry['illustration_id']}")

            try:
                future.result()
            except Exception as e:
                logging.error(f"Error downloading image: {e}")


ERROR:root:Unknown card type for Dungeon of the Mad Mage


Downloads Queued, waiting for completion...


INFO:root:Inalla, Archmage Ritualist found at 1747083345.0013182 with illustration id 7b3fb084-d81e-4331-b813-b6c8e13674ab
INFO:root:Inalla, Archmage Ritualist found at 1747083362.3481514 with illustration id a5343de9-a77a-43c4-a6a8-53c7e0217ed1


### Match Previously Tagged Cards to Downloaded Art

In [20]:
search_query = 'arttag:bisexual-lighting'
search_unique = 'art'
search_include_extras = 'true'

def search_cards(query, unique = 'cards', include_extras = 'true', order='name'):
    """Function to search for cards using the Scryfall API
    query: str
        The search query to use for the Scryfall API
    unique: str
        Which uniqueness quality to use for the search (card, art, prints)
    include_extras: str
        Whether to include extra cards in the search (true, false)
    order: str
        The order to use for the search (name, set, released, rarity, etc.)
    """
    # If result length is 175, likely another page
    data_len = 175
    search_data = []
    page = 1

    while data_len == 175:
        response = session.get(
            SF_API_URL + '/cards/search',
            headers=SF_API_HEADERS,
            params={
                'q': query,
                'unique': unique,
                'include_extras': include_extras,
                'order': order,
                'page': page
            }
        )
        search_data.extend(response.json()['data'])
        data_len = len(response.json()['data'])
        page += 1

    return search_data

search_data = search_cards(search_query, search_unique, search_include_extras)


In [21]:
def match_search(entry, is_dfc=False, card_face=None):
    if not is_dfc:
        filename = f"{clean_cardname(entry['name'])}_{entry['illustration_id']}.jpg"
        art_path = os.path.join('data\\art', filename)
        art_dict = {'name': entry['name'],
                    'illustration_id': entry['illustration_id'],
                    'path': art_path}
    else:
        filename = f"{clean_cardname(card_face['name'])}_{card_face['illustration_id']}.jpg"
        art_path = os.path.join('data\\art', filename)
        art_dict = {'name': card_face['name'],
                    'illustration_id': card_face['illustration_id'],
                    'path': art_path}

    # Check if the file exists
    if not os.path.exists(art_path):
        print(f"File {filename} does not exist.")
    return art_dict


with ThreadPoolExecutor() as executor:
    futures = []
    for search_card in search_data:
        # If not DFC
        if 'image_uris' in search_card:
            future = executor.submit(match_search, search_card)
            futures.append(future)
        # If DFC
        else:
            for card_face in search_card['card_faces']:
                future = executor.submit(match_search, search_card, True, card_face)
                futures.append(future)

# For art in futures.result, add to matches
search_matches = []
for future in futures:
    result = future.result()
    search_matches.append(result)

In [33]:
# count of dfcs in search_data
dfc_count = 0
for match in search_data:
    if 'card_faces' in match:
        dfc_count += 1

In [34]:
dfc_count

8

In [35]:
len(search_matches)

298

In [36]:
len(search_data)

291