In the first Jupyter notebook I was looking at the different ways to calulate pi, in this notebook I am going to look at calculating pi with a pair of RabbitMQ queues. The approach I'm going to take is to write a producer which will generate ranges of values for a node to calculate in a job queue. The worker nodes will consume the work node, make all of the calculations, and then add its results to another results queue. 

**Connecting to and Configuring the RabbitMQ queue**

Straight from the RabbitMQ python hello world, my RabbitMQ server is running in a Docker container on my desktop. 

In [3]:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='GLS_ranges', durable=True)
channel.queue_declare(queue='GLS_results', durable=True)
channel.basic_qos(prefetch_count=1)

<METHOD(['channel_number=1', 'frame_type=1', "method=<Queue.DeclareOk(['consumer_count=0', 'message_count=0', 'queue=GLS_results'])>"])>

**Creating a Producer Function**

In this function we are going to produce a bunch of dictionaries (practically JSON documents) to our message queue using the `basic_publish` function in our `channel` object.  

In [None]:
def work_producer(endNumber, blockStepping=1000000):
    for i in range(1, endNumber, blockStepping):
        d = {"start":i, "end":i+blockStepping}
        channel.basic_publish(exchange='',
                      routing_key='GLS_ranges',
                      body=f'{d}')

**Creating a Worker Function**

The worker function needs to more things than the producer, it needs to read the queue, figure out if the starting value should be added or subtracted, do the rest of the calculations, and then report back a change value to the `GLS_results` queue. 

*Reading from the queue*  

Going to lift again from RabbitMQ's Hello World for our consume because we have a basic message to get. 

In [4]:
import json

def callback(ch, method, properties, body):
    v = json.loads(body)
    calculations(v['start'], v['end'])
channel.basic_consume(queue='GLS_ranges',
                      auto_ack=True,
                      on_message_callback=callback)

NameError: name 'callback' is not defined

*Doing the calculations* 

We need a function to do the calculations, but first that function needs to know whether to start off with addition or subtraction. To figure that out, we need to look at the nature of the Gregory-Leibniz series and how the denominators work, the values we work with are all odd numbers, but we are adding two for each step of the calculation. This means if we add or subtract 1 to our current number we will get an even number, and using the modulo operator `%`, we can figure out if a number has a remained if divided by 4. 

* If we add 1 to our number and the modulo operator returns `0` OR subtract 1 and the modulo returns `2`, we need to subtract our current calculation.
* If we add 1 to our number and the modulo operator returns `2` OR subtract 1 and the modulo returns `0`, we need to add our current calculation. 

Since we are doing this in a queued way to do a sort of multiprocessing, we are going to also use the `decimal` library since a simple built in `float` will probably not keep up, so we will set our Decimal context to 2^16 (`65536`). On my desktop the decimal library let's me set the `prec` value to 2^59. 

In [None]:
from decimal import getcontext, Decimal

getcontext().prec = 2**16

def calculations(startingN, endingN):
    if type(startingN) != int:
        startingN = int(startingN)
    if type(endingN) != int:
        endingN = int(endingN)
    range_value = Decimal(0.0)
    if (startingN + 1)%4 == 0:
        current_operator = "-"
    elif (startingN + 1)%4 == 2:
        current_operator = "+"
    for i in range(startingN, endingN, 2):
        current_calculation = Decimal(4)/Decimal(i)
        if current_operator == "+":
            range_value += current_calculation
            current_operator = "-"
        elif current_operator == "-":
            range_value -= current_calculation
            current_operator = "+"
    publish_range(range_value)

In [None]:
def publish_range(publish_value):
    channel.basic_publish(exchange='',
                      routing_key='GLS_results',
                      body=f'{str(publish_value)}')

**Creating an Accumulator**

This was a hard function to write, I went with writing a file juggling algorithm as I couldn't figure out how to get a running pi calculation variable to be callable through the callback. I also write a file of the decimal.Decimal value recieved from the RabbitMQ queue.

In [None]:
def callback_accumlation(ch, method, properties, body):
            if os.path.exists("calculation.txt") == False:
                f = open("calculation.txt", "w")
                f.write(f"{str(Decimal(0.0))}")
                f.close()
            with open("calculation.txt", "r") as pi_file:
                pi_calc = Decimal(pi_file.read())
            pi_calc += Decimal(body.decode("utf-8"))
            with open("calculation.txt", "w") as pi_file:
                pi_file.write(str(pi_calc))
            with open(f"returndata.{str(uuid.uuid4())}.txt","w") as f:
                f.write(body.decode("utf-8"))

**Adding Command Line Arguments**

I want to produce a single python file and call it using the command line, so I'm going to use the `argparse` library to add the command line arguments. 

In [None]:
def arguments():
    parser = argparse.ArgumentParser()
    parser.add_argument("-e", "--endinteger", type=int,
                        help="An ending integer you wish your program to stop at, pick a big number (like more than a billion)")
    parser.add_argument("-s", "--stepsize", default=1000000, type=int,
                        help="Step size is the number of numbers to be calculated in a single go, computers are fast, pick a reasonably big number (like 1 million)")
    parser.add_argument("-p", "--processnumber", type=int,
                        help="Which process this program will run on. 0 = producer, 1>= calculator")
    args = parser.parse_args()
    return(args)

**Bringing Everything together**

I have included the full `rabbitmq-GLS.py` file in this GitHub repository. Using a second script you can easily spawn a bunch of workers, the accumulator, and the producer processes. This second script uses the multiprocessing library to get the number of cpu cores with `cpu_count()`, and uses the `subprocess.Popen()` function to spawn the threads. 

In [None]:
import subprocess, time, multiprocessing

cpu_count = multiprocessing.cpu_count()
script_path = "rabbitmq-GLS.py"
end_number = "5000000000"

for i in range(cpu_count-1):
    subprocess.Popen(["python", script_path, "-e", end_number, "-p", "2"])
subprocess.Popen(["python", script_path, "-e", end_number, "-p", "1"])
time.sleep(2)
subprocess.Popen(["python", script_path, "-e", end_number, "-p", "0"])