# Create threaded pipelines for downloading images

The goal of this notebook is to create a pipeline that:
- download images
- write image name, url name, image size and time of addition in book
- remove image

We will use a sequential process and compares against multithreaded options. We will make use of queue to coordinate threads.

In [3]:
from queue import Queue
from threading import Thread
import time
import os
import requests

In [4]:
import sys

module_name = '..'
if module_name not in sys.path:
    sys.path.append(module_name)

import utils
from utils import perf_decorator

## I. Utils function

In [6]:
def initialize_book(name):
    """ create book that save images """

    # use write mode so that previous book is replaced
    with open(name, 'w') as f:
        f.write(f"BOOK created at {time.ctime()}")
        f.write('\n')

In [7]:
def delay(time_to_sleep):
    """ decorator that simulate a long I/O process """
    def decorator(func):
        def inner(*args, **kwargs):
            time.sleep(time_to_sleep) # simulte IO delay
            return func(*args, **kwargs)
        return inner
    return decorator

In [8]:
@delay(0.125)
def download_image(img_url):
    """ download with img_url """
    img_bytes = requests.get(img_url).content
    img_name = img_url.split('/')[3]
    img_name = f'{img_name}.jpg'
    
    with open(img_name, 'wb') as img_file:
        img_file.write(img_bytes)
        print(f'{img_name} was downloaded...')

    return img_url, img_name

@delay(0.25)
def add_in_book(img_url, img_name, book_name="BOOK.txt"):
    """ add image in book  """

    size = os.path.getsize(img_name)
    
    with open(book_name, 'a') as fp:
        fp.write('\n')
        fp.write(f"Added at {time.ctime()}")
        fp.write('\n')
        fp.write(f"name:{img_name!r} - url:{img_url!r} - size:{size/1024**2:.2f} Mb ")
        fp.write('\n')

    return img_name

@delay(0.50)
def remove_img(img_name):
    """ write image size  """

    os.remove(img_name)
    print(f"{img_name} was removed")
    return img_name

In [9]:
img_urls = [
    'https://images.unsplash.com/photo-1516117172878-fd2c41f4a759',
    'https://images.unsplash.com/photo-1532009324734-20a7a5813719',
    'https://images.unsplash.com/photo-1524429656589-6633a470097c',
    'https://images.unsplash.com/photo-1530224264768-7ff8c1789d79',
    'https://images.unsplash.com/photo-1564135624576-c5c88640f235',
    'https://images.unsplash.com/photo-1541698444083-023c97d3f4b6',
    'https://images.unsplash.com/photo-1522364723953-452d3431c267',
    'https://images.unsplash.com/photo-1513938709626-033611b8cc03',
    'https://images.unsplash.com/photo-1507143550189-fed454f93097',
    'https://images.unsplash.com/photo-1493976040374-85c8e12f0c0e',
    'https://images.unsplash.com/photo-1504198453319-5ce911bafcde',
    'https://images.unsplash.com/photo-1530122037265-a5f1f91d3b99',
    'https://images.unsplash.com/photo-1516972810927-80185027ca84',
    'https://images.unsplash.com/photo-1550439062-609e1531270e',
    'https://images.unsplash.com/photo-1549692520-acc6669e2f0c'
]

## II. Non threaded option

In [11]:
@perf_decorator
def main():
    book_name = "BOOK.txt"
    initialize_book(book_name)
    
    for img_url in img_urls:
        p = download_image(img_url)
        p = add_in_book(*p, book_name = "BOOK.txt")
        remove_img(p)
    print("Done")

main()

photo-1516117172878-fd2c41f4a759.jpg was downloaded...
photo-1516117172878-fd2c41f4a759.jpg was removed
photo-1532009324734-20a7a5813719.jpg was downloaded...
photo-1532009324734-20a7a5813719.jpg was removed
photo-1524429656589-6633a470097c.jpg was downloaded...
photo-1524429656589-6633a470097c.jpg was removed
photo-1530224264768-7ff8c1789d79.jpg was downloaded...
photo-1530224264768-7ff8c1789d79.jpg was removed
photo-1564135624576-c5c88640f235.jpg was downloaded...
photo-1564135624576-c5c88640f235.jpg was removed
photo-1541698444083-023c97d3f4b6.jpg was downloaded...
photo-1541698444083-023c97d3f4b6.jpg was removed
photo-1522364723953-452d3431c267.jpg was downloaded...
photo-1522364723953-452d3431c267.jpg was removed
photo-1513938709626-033611b8cc03.jpg was downloaded...
photo-1513938709626-033611b8cc03.jpg was removed
photo-1507143550189-fed454f93097.jpg was downloaded...
photo-1507143550189-fed454f93097.jpg was removed
photo-1493976040374-85c8e12f0c0e.jpg was downloaded...
photo-149

## III. Threaded option with queue

In [13]:
class ClosableQueue(Queue):
    """ Create a closable queue """
    
    SENTINEL = object()

    def __init__(self):
        super().__init__()
        self.count = 0
        
    def close(self):
        self.put(self.SENTINEL)

    def __iter__(self):
        while True:
            item = self.get() # block while waiting for the item to be getted
            try:
                # if the SENTINEL object is gotten it signal the loop to terminate
                if item is self.SENTINEL:
                    return 
                yield item
            finally:
                self.count += 1
                # Mark item as done when gone through the loop. 
                # When calling .join() on a queue it terminates if the same number of elements went in and out
                self.task_done()

class StoppableWorker(Thread):
    """ Create a stoppable worker """
    
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue
    
    def run(self):
        for item in self.in_queue:
            result = self.func(item)
            self.out_queue.put(result)

In [None]:
@perf_decorator
def main_threaded():
    book_name = "BOOK_threaded.txt"
    initialize_book(book_name)

    # necessary to unpack args because only one item (here a tuple) can be passed as args through threads
    # a tuple is passed to unpack_add_in_book through thread and it is unpacked in add_in_book
    unpack_add_in_book = lambda p : add_in_book(*p, book_name=book_name)
    
    download_queue = ClosableQueue()
    add_book_queue = ClosableQueue()
    remove_queue = ClosableQueue()
    done_queue = ClosableQueue()
    
    threads = [
        StoppableWorker(download_image, download_queue, add_book_queue),
        StoppableWorker(unpack_add_in_book , add_book_queue, remove_queue),
        StoppableWorker(remove_img, remove_queue, done_queue),
    ]
    
    for thread in threads:
        thread.start()
    
    for img_url in img_urls:
        download_queue.put(img_url)
        
    # signal to close all queue once done and wait for them to finish
    download_queue.close()
    download_queue.join()
    
    add_book_queue.close()
    add_book_queue.join()
    
    remove_queue.close()
    remove_queue.join()
    
    for thread in threads:
        thread.join()

    print("donwload_queue", download_queue.count)
    print("add_book_queue", add_book_queue.count)
    print("remove_queue", remove_queue.count)
    print("element in done queue", done_queue.qsize(), " expected number of elements", len(img_urls))

main_threaded()

photo-1516117172878-fd2c41f4a759.jpg was downloaded...
photo-1516117172878-fd2c41f4a759.jpg was removed
photo-1532009324734-20a7a5813719.jpg was downloaded...
photo-1532009324734-20a7a5813719.jpg was removed
photo-1524429656589-6633a470097c.jpg was downloaded...
photo-1524429656589-6633a470097c.jpg was removed


## III. Even more multithreading

In [None]:
def start_threads(count, *args):
    
    # start count threads
    threads = [StoppableWorker(*args) for _ in range(count)]
    
    for thread in threads:
        thread.start()
    
    return threads

def stop_threads(closable_queue, threads):
    
    # close all queues
    for _ in threads:
        closable_queue.close()

    # wait for the queue to finish
    closable_queue.join()

    # wait for thread to finish
    for thread in threads:
        thread.join()

In [None]:
@perf_decorator
def main_even_more_threads():
    
    book_name = "BOOK_even_more_threads.txt"
    initialize_book(book_name)
    
    unpack_add_in_book = lambda p : add_in_book(*p, book_name=book_name)
    
    download_queue = ClosableQueue()
    add_book_queue = ClosableQueue()
    remove_queue = ClosableQueue()
    done_queue = ClosableQueue()
    
    download_threads = start_threads(2, download_image, download_queue, add_book_queue)
    add_book_threads = start_threads(4, unpack_add_in_book , add_book_queue, remove_queue)
    remove_threads = start_threads(6, remove_img, remove_queue, done_queue)
    
    for img_url in img_urls:
        download_queue.put(img_url)
        
    stop_threads(download_queue, download_threads)
    stop_threads(add_book_queue, add_book_threads)
    stop_threads(remove_queue, remove_threads)
        
    print(done_queue.qsize(), 'items finished')

main_even_more_threads()