In [1]:
# This tutorial will cover the Executor methods, ThreadPoolExecutors and concurrent.futures module methods.

from concurrent.futures import ThreadPoolExecutor 
import concurrent.futures
from datetime import datetime
import time

In [2]:
# Variables
x_list = [5, 6, 7, 8] # List of x values

In [3]:
# Function to return x to the power y, with a delay of 5 seconds.
def power(x, y):
    time.sleep(5)
    return pow(x, y)

In [4]:
# First we'll look at the the methods offered by the abstract Executor class.
# This needs to be in the form of one of it's subclasses (such as ThreadPoolExecutor)
# submit(fn, *args, **kwargs) - This method is used to schedule tasks on the Executor. Returns a future object.
executor = ThreadPoolExecutor(max_workers=1) # Make a ThreadPoolExecutor object with 1 worker (This won't run the program concurrently as there will only be 1 worker thread)
future = executor.submit(power, 5, 2) # Submit the task of power(5,2) to the ThreadPoolExecutor
print(future.result()) # the result() method of a future is used to get the value once the future as finished executing

25


In [5]:
# Function to double a number
def double_func(x):
    time.sleep(5)
    return x*2

In [6]:
# map(fn, *iterables, timeout=None, chunksize=1) - This method is used to schedule the function to each of the values
# in the iterable. Returns an iterable instead of a future.
executor = ThreadPoolExecutor(max_workers=1)
s = range(10)
for i in executor.map(double_func, s):
    print(i)


0
2
4
6
8
10
12
14
16
18


In [7]:
# shutdown(wait) - signals to the executor that it should free up any resources it currently has once pending
# tasks are completed. The wait parameter basically allows you to wait for all the resources to be returned before
# continuing in your program or to just continue on anyway. 
# If wait == True, then the method will not return until ALL the pending tasks have been completed. 
# If wait == False, then the executor will immediately return. (Pending tasks will still be completed)
executor = ThreadPoolExecutor(max_workers=1)
task_A = executor.submit(power, 2, 2)
task_B = executor.submit(power, 3, 2)
task_C = executor.submit(power, 4, 2)

print("Waiting for all tasks to finish before progressing in the program.")
executor.shutdown(True)

# All of these results should appear at the same time.
print(task_A.result())
print(task_B.result())
print(task_C.result())

Waiting for all tasks to finish before progressing in the program.
4
9
16


In [8]:
# Function to return the results of the submitted tasks to the executor.
def results_of_tasks(future_to_power):
    for future in future_to_power: # For each future
        print(future.result()) # Return the result of the task once it has been completed

In [9]:
# Now we'll observe how the program would run without multi-threading. Using 1 worker will simulate this.
executor = ThreadPoolExecutor(max_workers=1)
start_time = datetime.now()
future_to_power = {executor.submit(power, x, 2): x for x in x_list} # Get a list of futures from the submitted tasks to the executor
results_of_tasks(future_to_power)

end_time = datetime.now()
print("Time Taken:", end_time - start_time)

25
36
49
64
Time Taken: 0:00:20.018656


In [10]:
# As you saw, the results come in one at a time, quite slowly. However, if we increase the number of worker threads
# to equal the number of tasks we're submitting to the ThreadPoolExecutor, the time taken to get all our results back will be 
# smaller.

In [11]:
executor = ThreadPoolExecutor(max_workers=4) # Declare the executor with a maximum of 4 worker threads.
start_time = datetime.now()
future_to_power = {executor.submit(power, x, 2): x for x in x_list} # Get a list of futures from the submitted tasks to the executor
results_of_tasks(future_to_power)
 
end_time = datetime.now()
print("Time Taken:", end_time - start_time)

25
36
49
64
Time Taken: 0:00:05.014428


In [12]:
# This is a quicker method since each worker thread is processing each of the tasks simultaneously. Each time.sleep(5)
# is being awaited in the same instance of time. Another example may be more clear.

In [13]:
# In this example, we'll half the amount of worker threads assigned to the thread pool. 
# This means that each worker thread will have 2 tasks to work through, so it'll take double the amount of time than before
executor = ThreadPoolExecutor(max_workers=2) # Declare the executor with a maximum of 2 worker threads.
start_time = datetime.now()
future_to_power = {executor.submit(power, x, 2): x for x in x_list} # Get a list of futures from the submitted tasks to the executor
results_of_tasks(future_to_power)

end_time = datetime.now()
print("Time Taken:", end_time - start_time)

25
36
49
64
Time Taken: 0:00:10.019446


In [14]:
# Now we'll look at the concurrent.futures module methods
# wait(fs, timeout=None, return_when=ALL_COMPLETED) - This is used to wait for the future instances and can be returned
# at different points : FIRST_COMPLETED (when any future finishes) 
# or FIRST_EXCEPTION (when any future finishes by raising an exception)
# or ALL_COMPLETED (when all futures are finished)
# Returns a tuple of done and not done futures, containing the states of execution of each future.
executor = ThreadPoolExecutor(max_workers=4) # Declare the executor with a maximum of 4 worker threads.
future_to_power = {executor.submit(power, x, 2): x for x in x_list} # Get a list of futures from the submitted tasks to the executor
print(concurrent.futures.wait(future_to_power, return_when='FIRST_COMPLETED'))
results_of_tasks(future_to_power)


DoneAndNotDoneFutures(done={<Future at 0x10a072400 state=finished returned int>, <Future at 0x10a048e10 state=finished returned int>, <Future at 0x10a072cc0 state=finished returned int>}, not_done={<Future at 0x10a072f98 state=running>})
25
36
49
64


In [15]:
# as_completed(fs, timeout=None) - Returns an iterator over the future instances. Futures appear in the iterator 
# as they are completed. This function will not care about the order of which the callables where submitted.
executor = ThreadPoolExecutor(max_workers=4) # Declare the executor with a maximum of 4 worker threads.
future_to_power = {executor.submit(power, x, 2): x for x in x_list} # Get a list of futures from the submitted tasks to the executor
for future in concurrent.futures.as_completed(future_to_power):
    print(future.result())

25
36
49
64
