In [None]:
!pip install aiomultiprocess aiohttp

In [None]:
import aiomultiprocess
from aiomultiprocess import Pool

In [None]:
# play with this if you experience issues with multiprocessing
aiomultiprocess.set_start_method("fork")

In [None]:
from aiohttp import request, ClientTimeout
from PIL import Image, UnidentifiedImageError
from io import BytesIO
from tqdm.auto import tqdm
import os
import asyncio

In [None]:
os.makedirs('images', exist_ok=True)
photos_downloaded = set(int(x.split('.')[0]) for x in os.listdir('images'))

In [None]:
photos = [
    ('id_0', 'https://picsum.photos/200/300'),
    ('id_1', 'https://picsum.photos/250/350'),
    ('id_2', 'https://picsum.photos/300/400'),
]

In [None]:
len(photos_downloaded), len(photos)

In [None]:
def save_image(content: bytes, path: os.PathLike, reencode: bool = True):
    if reencode:
        img = Image.open(BytesIO(content))
        img = img.convert('RGB')
        img.thumbnail((350, 350))
        img.save(path, quality=80)
    else:
        with open(path, 'wb') as f:
            f.write(content)

async def download(task):
    try:
        photo_id, url = task
        timeout = ClientTimeout(total=30)
        async with request("GET", url, timeout=timeout) as response:
            if response.status != 200:
                return 0
            content = await response.read()
            save_image(content, f'images/{photo_id}.jpg', reencode=True)
        if content is None:
            return 0  
    except KeyboardInterrupt:
        raise
    except:
        return 0  # replace with `raise` to fail on errors 
    return 1

In [None]:
async def main():
    pbar = tqdm(total=len(photos) + len(photos_downloaded), initial=len(photos_downloaded))
    async with Pool(processes=4, childconcurrency=100) as pool:
        async for result in pool.map(download, photos):
            pbar.update(result)
    pbar.close()

In [None]:
await main()

In [None]:
!ls images