
I began this data challenge by sketching my vision of the system required to stabilize the rate of document polling. Adding a buffer system in between the document store and the raw data aggregation process could balance the input of new events, rendering it stable no matter time of week or day. The question now is, how do we 
guarantee this?

For simplicity's sake, let's imagine the document store has a maximum output of 100 documents per minute (when it's the busiest) and 1 document per hour (when it's the slowest). Because the time that it takes to aggregate new events is directly proportional to the number of events itself, without our buffer, the busiest times would cause the data aggregation to be 100 times faster than the slowest times. 


In [8]:
from IPython.display import Image
from IPython.display import display
display(Image(url='image_snip.png'))

Implementing a derivation of the producer-consumer algorithm is one approach to the task at hand. We have two competing demands here, the creation of new events by the data source / document store (producer) and the aggregation of this data (consumer). In order to normalize the rate of data consumption, these demands must be balanced AND they must run concurrently. Therefore, in pythonic terms, we need to create two separate Threads to conduct each. Threading is a good way to decrease a system processor's idle time by splitting up the work into two separate chunks that are dealt with concurrently instead of sequentially. We also want to make sure that the consumer thread understands to not run or ask the producer thread for data when there is none, so we need to use Condition object which will allow the consumer thread to wait to grab the data until notified by the producer thread that it may do so. Conversely, we want the producer thread to send new data to the buffer when there is space for it and to wait when the buffer is full. In other words, we want to add a dynamic ceiling to our buffer system that will prevent the producer from being called too often when there is a slower flow of data but also called often enough when there is a fast flow of data. 

In [None]:
from threading import Thread, Condition
import time
import random

#setting buffer to empty list
buffer = []

#setting buffer_ceiling to static value of 50 to begin
buffer_ceiling = 50

#Condition object 
#allows the consumer thread to wait to grab the data until notified by the producer thread that it may do so.
condition = Condition()

class ProducerThread(Thread):
    def run(self):
        #giving nums a range of 100 to represent 1-100 range of hourly document creation mentioned above
        nums = range(100)
        global buffer
        while True:
            condition.acquire()
            #if the sum of the documents is higher than the imposed buffer_ceiling, the buffer needs to wait
            if sum(buffer) > buffer_ceiling:
                print "Buffer is at maximum limit, producer is waiting for consumer to make room for more data"
                condition.wait()
                print "Buffer is no longer at maximum limit, producer can send data"
            num = random.choice(nums)
            buffer.append(num)
            print "Produced", num
            condition.notify()
            condition.release()
            time.sleep(random.random())


class ConsumerThread(Thread):
    def run(self):
        global buffer
        while True:
            condition.acquire()
            if not buffer:
                print "Buffer is not at maximum limit, consumer is waiting for producer to send more data"
                condition.wait()
                print "Buffer now has data, consumer can consume "
            num = buffer.pop(0)
            print "Consumed", num
            condition.notify()
            condition.release()
            time.sleep(random.random())


ProducerThread().start()
ConsumerThread().start()

The problem with the above code is that the buffer_ceiling is hard-coded to 50. However, this buffer ceiling should be dynamic and responsive to current data rate. Having a ceiling at 50 for a slow day would mean that it might take 50 minutes for the raw data to be aggregated, which is clearly too long. Conversely, for a fast day it would mean that the aggregation would take place twice a minute, while in reality it could be much faster. In order to make buffer_ceiling dynamic and responsive, we could create a variable called data_rate which would reflect the rate of data produced. This way, we would be setting a ceiling at a rate of documents per minute, not a count of documents. 



In [None]:
from threading import Thread, Condition
import time
import random

buffer = []
buffer_ceiling = 50
condition = Condition()


list_of_rates = []

class ProducerThread(Thread):
    def run(self):
        nums = range(100)
        #setting time range to 60, the number of minutes per hour
        time_range = range(1,60)
        global buffer
        global list_of_rates 
        while True:
            condition.acquire()
            if sum(buffer) > buffer_ceiling:
                print "Buffer is at maximum limit, producer is waiting for consumer to consumer and make room for more data"
                condition.wait()
                print "Buffer is no longer at maximum limit, producer can send data"
            num = random.choice(nums)
            buffer.append(num)
            time_to_produce = random.choice(time_range)
            print "Produced " + str(num) + " documents"
            print "Minutes it took to produce: " + str(time_to_produce)
            data_rate = round((float(num) / time_to_produce), 4)
            print 'Data rate is ' + str(data_rate) + " document per minute"
            list_of_rates.append(data_rate)
#             print list_of_rates

class ConsumerThread(Thread):
    def run(self):
        global buffer
        while True:
            condition.acquire()
            if not buffer:
                print "Buffer is not at maximum limit, consumer is waiting for producer to send more data"
                condition.wait()
                print "Buffer now has data, consumer can consume "
            num = buffer.pop(0)
            print "Consumed " + str(num) + " documents"
            condition.notify()
            condition.release()
            time.sleep(random.random())

ProducerThread().start()
ConsumerThread().start()




In [None]:
import numpy

mean_rate = (numpy.mean(list_of_rates))
print "Mean rate is " + str(mean_rate)

After running the code several times to get a substantial sample list of data_rates, we see that the mean rate hovers around 3-4 documents per minute. With this mean rate in hand, we can now set the data_ceiling to this more dynamic value and compare each document production's data rate to check if it is above or below the average. If it is higher than average, it is at the maximum limit - if not, the system can proceed. Below is the final code. 

In [None]:
from threading import Thread, Condition
import time
import random

buffer = []

#Setting buffer ceiling to 3, the average data rate per minute
buffer_ceiing = 3.5

condition = Condition()


class ProducerThread(Thread):
    def run(self):
        nums = range(100)
        time_range = range(1,60)
        global buffer
        global list_of_rates 
        while True:
            condition.acquire()
            if list_of_rates[0] > buffer_ceiling:
                print "Buffer is at maximum limit, producer is waiting for consumer to consumer and make room for more data"
                condition.wait()
                print "Buffer is no longer at maximum limit, producer can send data"
            num = random.choice(nums)
            buffer.append(num)
            time_to_produce = random.choice(time_range)
            print "Produced " + str(num) + " documents"
            print "Minutes it took to produce: " + str(time_to_produce)
            data_rate = round((float(num) / time_to_produce), 4)
            print 'Data rate is ' + str(data_rate) + " document per minute"
            list_of_rates.append(data_rate)
#             print list_of_rates

class ConsumerThread(Thread):
    def run(self):
        global buffer
        while True:
            condition.acquire()
            if not buffer:
                print "Buffer is not at maximum limit, consumer is waiting for producer to send more data"
                condition.wait()
                print "Buffer now has data, consumer can consume "
            num = buffer.pop(0)
            print "Consumed " + str(num) + " documents"
            condition.notify()
            condition.release()
            time.sleep(random.random())

ProducerThread().start()
ConsumerThread().start()

# References

http://stackoverflow.com/questions/2846653/how-to-use-threading-in-python 
http://agiliq.com/blog/2013/10/producer-consumer-problem-in-python/
http://agiliq.com/blog/2013/09/understanding-threads-in-python/
https://en.wikipedia.org/wiki/Race_condition#Example
http://chriskiehl.com/article/parallelism-in-one-line/


