# Luiz's notes on Palach's Parallel Programming with Python

In [1]:
#coding: utf-8

import logging, threading

from queue import Queue

### Setting up logging and threads

In [2]:
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(message)s')

ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(formatter)
logger.addHandler(ch)

fibo_dict = {}
shared_queue = Queue()
input_list = [3, 10, 5, 7]

In [3]:
queue_condition = threading.Condition()

## Fibonacci with multiple values

In [4]:
def fibonacci(n):
    a,b = 0, 1
    for item in range(n):
        a, b = b, a + b
    return a

### The task that controls the fibonacci computing threads

In [5]:
def fibonacci_task(condition):
    with condition:
        while shared_queue.empty():
            logger.info("[%s] - waiting for elements in queue."
                        % threading.current_thread().name)
            condition.wait()
        else:
            value = shared_queue.get()
            fibo_dict[value] = fibonacci(value)
        shared_queue.task_done()
        logger.debug("[%s] fibonacci of key [%d] with result [%d]" %
                     (threading.current_thread().name, value, fibo_dict[value]))

In [6]:
def queue_task(condition):
    logging.debug('Starting queue_task...')
    with condition:
        for item in input_list:
            shared_queue.put(item)
            logging.debug("Notifying fibonacci_task threads that the queue is ready to consume..")
        condition.notifyAll()

In [7]:
threads = [threading.Thread(daemon=True, target=fibonacci_task, args=(queue_condition,)) for i in range(4)]

In [8]:
prod = threading.Thread(name='queue_task_thread', daemon=True,
       target=queue_task, args=(queue_condition,))

In [9]:
[thread.start() for thread in threads]

2018-03-19 13:27:41,894 - [Thread-6] - waiting for elements in queue.
2018-03-19 13:27:41,956 - [Thread-7] - waiting for elements in queue.
2018-03-19 13:27:42,048 - [Thread-8] - waiting for elements in queue.
2018-03-19 13:27:42,150 - [Thread-9] - waiting for elements in queue.


[None, None, None, None]

In [10]:
prod.start()

2018-03-19 13:27:42,237 - Starting queue_task...
2018-03-19 13:27:42,255 - Notifying fibonacci_task threads that the queue is ready to consume..
2018-03-19 13:27:42,258 - Notifying fibonacci_task threads that the queue is ready to consume..
2018-03-19 13:27:42,261 - Notifying fibonacci_task threads that the queue is ready to consume..
2018-03-19 13:27:42,263 - Notifying fibonacci_task threads that the queue is ready to consume..
2018-03-19 13:27:42,267 - [Thread-6] fibonacci of key [3] with result [2]


In [11]:
[thread.join() for thread in threads]
prod.join()

2018-03-19 13:27:42,280 - [Thread-7] fibonacci of key [10] with result [55]
2018-03-19 13:27:42,286 - [Thread-8] fibonacci of key [5] with result [5]
2018-03-19 13:27:42,290 - [Thread-9] fibonacci of key [7] with result [13]


## Exercise: Trying to do a multiple sum reduce

In [12]:
import random
import time
N = 100
sums = 0

In [13]:
queue = Queue(maxsize=N)
lst = []
ticks = 0

In [14]:
def populate_task(condition):
    global ticks
    logging.debug("Starting populate task..")
    with condition:
        while ticks < N:
            n = random.randint(0,100)
            queue.put(n)
            lst.append(n)
            ticks += 1
            logging.debug("%d put in queue by thread %s" % (n, threading.current_thread().name) )
        condition.notifyAll()

In [15]:
def sum_task(condition):
    global sums
    logging.debug("Starting summing task..")
    with condition:
        while queue.empty():
            logger.info("[%s] - waiting for elements in queue."
                        % threading.current_thread().name)
            condition.wait()
        else:
            s = sums
            n = queue.get()
            sums += n
        queue.task_done()
        logger.debug("[%s] summed %d to %d and got %s." %
         (threading.current_thread().name, n, s, sums))

In [16]:
sum_threads = [threading.Thread(daemon=True, target=sum_task, args=(queue_condition,)) for i in range(min(N, 100))]
[thread.start() for thread in sum_threads]
pop_threads = [threading.Thread(daemon=True, target=populate_task, args=(queue_condition,)) for i in range(min(N, 100))]

2018-03-19 13:27:42,573 - Starting summing task..
2018-03-19 13:27:42,590 - [Thread-10] - waiting for elements in queue.
2018-03-19 13:27:42,635 - Starting summing task..
2018-03-19 13:27:42,661 - [Thread-11] - waiting for elements in queue.
2018-03-19 13:27:42,699 - Starting summing task..
2018-03-19 13:27:42,714 - [Thread-12] - waiting for elements in queue.
2018-03-19 13:27:42,729 - Starting summing task..
2018-03-19 13:27:42,746 - [Thread-13] - waiting for elements in queue.
2018-03-19 13:27:42,789 - Starting summing task..
2018-03-19 13:27:42,813 - [Thread-14] - waiting for elements in queue.
2018-03-19 13:27:42,828 - Starting summing task..
2018-03-19 13:27:42,847 - [Thread-15] - waiting for elements in queue.
2018-03-19 13:27:42,867 - Starting summing task..
2018-03-19 13:27:42,894 - [Thread-16] - waiting for elements in queue.
2018-03-19 13:27:42,909 - Starting summing task..
2018-03-19 13:27:42,927 - [Thread-17] - waiting for elements in queue.
2018-03-19 13:27:42,956 - Starti

In [17]:
[thread.start() for thread in pop_threads]

2018-03-19 13:27:48,793 - Starting populate task..
2018-03-19 13:27:48,808 - 43 put in queue by thread Thread-110
2018-03-19 13:27:48,811 - 96 put in queue by thread Thread-110
2018-03-19 13:27:48,813 - 51 put in queue by thread Thread-110
2018-03-19 13:27:48,815 - 82 put in queue by thread Thread-110
2018-03-19 13:27:48,818 - 96 put in queue by thread Thread-110
2018-03-19 13:27:48,821 - 52 put in queue by thread Thread-110
2018-03-19 13:27:48,823 - 45 put in queue by thread Thread-110
2018-03-19 13:27:48,825 - 46 put in queue by thread Thread-110
2018-03-19 13:27:48,827 - 99 put in queue by thread Thread-110
2018-03-19 13:27:48,830 - 25 put in queue by thread Thread-110
2018-03-19 13:27:48,832 - 2 put in queue by thread Thread-110
2018-03-19 13:27:48,834 - 23 put in queue by thread Thread-110
2018-03-19 13:27:48,836 - 88 put in queue by thread Thread-110
2018-03-19 13:27:48,839 - 68 put in queue by thread Thread-110
2018-03-19 13:27:48,844 - 73 put in queue by thread Thread-110
2018-

[None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None]

In [21]:
print(lst)
print(sum(lst))
print(sums)

[43, 96, 51, 82, 96, 52, 45, 46, 99, 25, 2, 23, 88, 68, 73, 93, 92, 43, 94, 38, 42, 36, 4, 32, 95, 38, 98, 15, 81, 55, 51, 45, 41, 2, 81, 78, 37, 32, 56, 64, 70, 0, 68, 74, 39, 88, 5, 69, 72, 74, 99, 25, 53, 86, 12, 29, 97, 53, 29, 12, 0, 6, 96, 73, 80, 98, 54, 0, 55, 87, 14, 87, 48, 76, 30, 21, 50, 24, 25, 13, 74, 55, 46, 71, 69, 18, 16, 24, 95, 6, 70, 3, 80, 16, 53, 83, 9, 54, 50, 13]
5128
5128


In [19]:
[thread.join() for thread in pop_threads]

[None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None]

In [20]:
[thread.join() for thread in sum_threads]

[None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None]