In [None]:
import tqdm
import os
from subprocess import check_output, CalledProcessError, STDOUT
import shlex
from concurrent.futures import ThreadPoolExecutor
import numpy as np


In [None]:
kaggle_image_paths = np.array([file_name.strip() for file_name in open('../data/all_files.txt').readlines()])


In [None]:
def system_call(command):
    """ 
    :param command: command as string e.g. 'ls -la'
    :returns: output, success
    :raises: CalledProcessError - decode with e.output.decode()
    """
    command = shlex.split(command)
    output = check_output(command, stderr=STDOUT).decode()
    return output


In [None]:
def kaggle_download(file_name: str) -> str:
    """
    Downloads a file from the kaggle imagenet competition.

    :param file_name: the full path to the file name based on the kaggle competition data repo
    :returns: output from the system call
    :raises: CalledProcessError - decode with e.output.decode()
    """
    kaggle_download_command = f'kaggle competitions download -c imagenet-object-localization-challenge -f {file_name} -p ../data/imagenet_images/'
    output = system_call(kaggle_download_command)
    return output


In [None]:
def download_chunk(chunk: list[str]) -> list[tuple[str, str]]:
    """
    Downloads a chunk of files from the kaggle imagenet competition.

    :param chunk: a list of full paths to the file names based on the kaggle competition data repo
    :returns: a list of failed downloads in the format (file, reason for failure)
    """

    failed_downloads: list[tuple[str, str]] = []  # file, reason
    for file in chunk:
        try:
            kaggle_download(file)
        except CalledProcessError as e:
            failed_downloads.append((file, e.output.decode()))

    return failed_downloads


In [None]:
chunks = np.split(kaggle_image_paths[:60], 20)


In [None]:
def threaded_download(chunks: list[list[str]] | np.ndarray) -> list[tuple[str, str]]:
    """
    Downloads a chunk of files from the kaggle imagenet competition in a threaded manner.

    :param chunk: a list of full paths to the file names based on the kaggle competition data repo
    :returns: a list of failed downloads in the format (file, reason for failure)
    """
    failed_downloads: list[tuple[str, str]] = []  # file, reason
    with ThreadPoolExecutor(max_workers=None) as executor:
        # use tqdm to track executors progress
        failed_downloads_futures = list(tqdm.tqdm(executor.map(download_chunk, chunks), total=len(chunks)))

        # resolve failed downloads
        failed_downloads = [item for sublist in failed_downloads_futures for item in sublist]

    return failed_downloads


In [None]:
def test_threading_speed():
    """
    Tests how much time it takes to spin up a thread
    """

    def mundane_task(*args):
        return 1

    mundane_task_results = []
    with ThreadPoolExecutor(max_workers=None) as executor:
        mundane_task_results = list(executor.map(mundane_task, range(20)))

    return [item for item in mundane_task_results]
