# Producer Consumer Pattern

In [2]:
from threading import Condition, Thread, Lock
import time
import random
import math
import progressbar


class Producer(Thread):
    def __init__(self, jobs, condition, queue):
        # print("worker jobs", jobs)
        self.jobs = jobs
        self.condition = condition
        self.queue = queue
        super(Producer, self).__init__()

    def run(self):
        while self.jobs:
            job = self.jobs.pop(0)
            
            result = job * 2
            
            self.condition.acquire()
            self.queue.append(result)
            self.condition.notify()
            self.condition.release()
            
            time.sleep(random.random())


class Consumer(Thread):
    def __init__(self, numjobs, condition, queue):
        self.numjobs = numjobs
        self.condition = condition
        self.queue = queue
        super(Consumer, self).__init__()
        
        self.results = []

    def run(self):
        bar = progressbar.ProgressBar(max_value=self.numjobs)
        
        for i in bar(range(self.numjobs)):
            # get the next result
            self.condition.acquire()
            if not self.queue:
                self.condition.wait()
            result = self.queue.pop(0)
            self.condition.release()
            
            self.results.append(result)
            # TODO: Open db connection

            
def dojob(jobs):
    queue = []
    lock = Lock()
    condition = Condition()

    njobs = len(jobs)
    nthreads = 4
    
    random.shuffle(jobs)
   
    # split size
    worker_njobs = math.ceil(njobs / nthreads)

    for i in range(0, njobs, worker_njobs):
        Producer(jobs[i:i+worker_njobs], condition, queue).start()

    consumer = Consumer(njobs, condition, queue)
    consumer.start()
        
    consumer.join()

    return consumer.results

    
jobs = list(range(30))
results = dojob(jobs)

jobs = sorted(jobs)
results = sorted(results)

print(jobs)
print(results)

for x, y in zip(jobs, results):
    if not y == x * 2:
        print("x", x, "y", y)

100% (30 of 30) |############################| Elapsed Time: 0:00:04 Time: 0:00:04


[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58]


## Producer keeps working until Consumer signals to stop

In [15]:
from threading import Condition, Event, Thread, Lock
import time
import random
import math
import progressbar


class Producer(Thread):
    def __init__(self, condition, queue):
        self._condition = condition
        self._queue = queue
        self._stop_signal = Event()
        super(Producer, self).__init__()

    def run(self):
        i = 0
        while not self.stop_signal:
            result = i
            i += 1
            
            self._condition.acquire()
            self._queue.append(result)
            self._condition.notify()
            self._condition.release()
            
            time.sleep(random.random())
        print("producer stopped")
            
    def stop(self):
        self._stop_signal.set()

    @property
    def stop_signal(self):
        return self._stop_signal.isSet()


class Consumer(Thread):
    def __init__(self, producer, term_cond, condition, queue):
        self.producer = producer
        self.term_cond = term_cond
        self.condition = condition
        self.queue = queue
        super(Consumer, self).__init__()

    def run(self):
        # bar = progressbar.ProgressBar(max_value=self.numjobs)

        result = 0
        
        while True:
            # get the next result
            self.condition.acquire()
            if not self.queue:
                self.condition.wait()
            result = self.queue.pop(0)
            self.condition.release()
        
            print("consumed", result)
            if self.term_cond(result):
                print("consumer stopped")
                self.producer.stop()
                return

            
def dojob(njobs):
    queue = []
    lock = Lock()
    condition = Condition()

    producer = Producer(condition, queue)
    producer.start()

    consumer = Consumer(producer, lambda i: i == 10, condition, queue)
    consumer.start()
    
    producer.join()
    consumer.join()
    print("done")
    
dojob(10)

consumed 0
consumed 1
consumed 2
consumed 3
consumed 4
consumed 5
consumed 6
consumed 7
consumed 8
consumed 9
consumed 10
consumer stopped
producer stopped
done
