In [None]:
# | default_exp utils.download

# Download utils

> Utilities for download files from the internet.

## Imports

Python modules

In [None]:
# | export
import logging
import multiprocessing
import urllib
import warnings
from collections import namedtuple
from queue import Queue
from threading import Thread
from typing import List, Union

Third-party modules

In [None]:
# | export
import requests

## Logger

In [None]:
# | export
logger = logging.getLogger("newrelic_sb_sdk")

## Classes

In [None]:
# | export


DownloadFileArgs = namedtuple(
    "DownloadFileArgs",
    [
        "url",
        "file_name",
    ],
)

In [None]:
# | export


class Downloader(Thread):
    job: Union[int, None] = None

    def __init__(self, *, queue: Queue, order: int):
        Thread.__init__(self)
        self.queue = queue
        self.order = order

    def run(self):
        while True:
            job, download_file_args = self.queue.get()

            if job is None:
                break

            self.job = job

            logger.debug(
                "Dowloader %d is downloading with parameters %r",
                self.order,
                download_file_args,
            )

            download_file(**download_file_args._asdict())

## Functions

In [None]:
# | export


def download_file(
    *,
    url: str,
    file_name: str,
) -> None:
    chunk_size = 1024

    response = requests.get(
        url,
        stream=True,
        timeout=60,
    )

    response.raise_for_status()

    file_size = int(response.headers.get("content-length", 0))

    if file_size == 0:
        warnings.warn(
            f"Size of {file_name} file is 0B.",
            UserWarning,
            stacklevel=2,
        )

    if not file_name:
        file_name = urllib.parse.urlparse(url).path.split("/")[-1]

    with open(file_name, "wb") as file:
        for chunk in response.iter_content(chunk_size):
            file.write(chunk)

In [None]:
# | export


def download_files(
    *,
    urls: List[str],
    base_file_name: str,
    file_extension: str,
) -> None:
    queue: Queue = Queue()

    empy_job = (
        None,
        DownloadFileArgs(None, None),
    )

    workers = []
    workers_count = multiprocessing.cpu_count()
    zero_padding = max(len(str(len(urls))), 1)

    jobs = [
        (
            order,
            DownloadFileArgs(
                url,
                f"{base_file_name}_{order:0>{zero_padding}d}.{file_extension}",
            ),
        )
        for order, url in enumerate(urls)
    ]

    for job in jobs:
        queue.put(job)

    for _ in range(workers_count):
        queue.put(empy_job)

    for order in range(workers_count):
        worker = Downloader(
            queue=queue,
            order=order,
        )
        worker.start()
        workers.append(worker)

    for worker in workers:
        worker.join()