# Ch. 5. Distributed Algorithms

## I. What is a Distributed Algorithm
A distributed algorithm is very similar in concept to a parallel algorithm, and since we discussed parallel algorithms at length in chapter 3, this chapter will be fairly brief. However, we will try and apply some of the concepts we learned in ch.3 to distributed systems, and talk about the main principles of distributed systems and distributed computing as opposed to parallel computing.

The first and most obvious thing to do is to define what we mean by distributed. In this case, distributed refers to a system or pool of systems made up of many interconnected computers. These computers could be interconnected by anything as fast as the fastest InfiniBand interconnect, or something as slow as dial-up. All that matters for our definition of a distributed system is that it has different components which cannot natively access each others' memory pools, but all of the components of the system still work together towards common tasks. Based on our definition, a cluster is a distributed system in its own right, but others would not consider a cluster a distributed system, and would describe a distributed system as a more loosely connected system, usually connected by the internet rather than a specialized network interconnect. In any case, arguing either side of this debate is not the point of this course, but giving you the tools to make your own conclusions is. The image below represents a distributed system, by our definition.

![distributed system architecture](http://www.ejbtutorial.com/wp-content/uploads/2013/09/distributed-system-1024x441.png)

We define a cluster as a distributed system because when writing code for a cluster, you need to make the same considerations as you would when writing more code for a "traditional" distributed system - you need to worry about how to make sure your code is fault tolerant, scalable, and most importantly, you need to make sure your code can make use of multiple nodes at once, otherwise, your code is not really cluster-optimized. 

So, now that we know what a distributed system is, briefly, what is a distributed algorithm? Well, a distributed algorithm is, rather boringly, an algorithm designed to be run on a distributed system. As mentioned earlier, when writing a distributed algorithm, there are a lot of new things you need to worry about that you wouldn't on a single system algorithm. 

You need to worry about what parts of your algorithm can run concurrently, as often, things can be offloaded to remote machines if they can be run concurrently. You need to worry about which parts of your process need to access things from other parts, because you can't depend on all of the remote parts of your code having access to the same memory pool as any other part. Because of this, you need to worry about how you can have the processes communicate with each other. Because that usually depends on some mixture of files and inter-process communication, you need to think about how to minimize the number of times those things happen, because they're slow. As you can see in the graphic above, applications A and C are not true distributed algorithms, because they only make use of one computer at a time, while application B is a distributed application.

Writing good distributed algorithms is one of the hardest parts of all of computer science. The distributed algorithms that perform well at scale and accomplish interesting goals are often some of the most complex programs in the world. This complexity is yet another reason why workflow management systems are useful and helpful. We will be using Parsl to help us work with distributed algorithms, because it makes our lives much easier and more fun.

### Example 5.1 - SOMETHING


In [3]:
# Code me up

## II. Cluster Architecture
Remember, in the first chapter, when we talked about networks and I said there would be more network discussion? Well, here's more network discussion. First, recall what we talked about before. You have a bunch of machines and you want then to all talk to each other. The way to do this that would be fastest is by plugging all of them directly into each other. In our diagram below, this represents a "fully connected" topology. The main downside of this topology is that it's really expensive and impractical. See that with just six computers, it takes twenty one wires. Most server units do not have very many network ports, so you run into problems there as well. 


A better way to network machines is to use some clever [Graph Theory](https://en.wikipedia.org/wiki/Graph_theory) to minimize the number of hops it takes to get from any node to any other node. Some popular ones include star, mesh, and tree architectures. For small clusters, a bus is a very good compromise, because while it takes longer to talk through a bus (network switch) than it would to talk straight over the network (call talking through a switch about a hop and a half), it is very fast to talk to any machine on the same switch. Therefore, for small clusters, ones that can fit on just a couple of switches, the bus architecture can rival the fully connected architecture in speed, without the cost or logistical challenges.The cluster this course is hosted on has a bus architecture, because it is quite a small cluster. There are also a number of more complex topologies, such as a torus, which is like a multi-dimensional version of a ring, and is illustrated by this image:
![torus architecture](https://upload.wikimedia.org/wikipedia/commons/thumb/3/3f/2x2x2torus.svg/485px-2x2x2torus.svg.png)

In a torus like this one, which is a 3D torus, each node is plugged into its three nearest neighbors, and in each direction, it's a loop, meaning the last one is plugged into the first one. Toruses are an attempt to minimize the number of hops it takes for two arbitrary nodes to communicate. In general, each node of an _n_ Dimensional torus will be plugged into its _n_ nearest neighbors.

There are even some proprietary architectures, like Cray's dragonfly interconnect, which I do not know much about, other than that it powers many of the largest and most powerful systems in the world.

Image for reference:
![Network Topologies](https://static9.depositphotos.com/1389325/1074/v/950/depositphotos_10743280-stock-illustration-illustration-of-network-topology-computer.jpg)

### Example - 5.2 Connecting to and Requesting Data From All Hosts
In this example, we're going to use Parsl to request information about the nodes we have access to. Recall that we can run (almost) arbitrary Python code from within Parsl apps, and recall that we can easily use Parsl to submit tasks through our resource manager to run lots of jobs on lots of nodes.

In [2]:
# Parsl config
from parsl import *
import logging

ipp_config = {
    "sites": [{
        "site": "LC_Cluster",
        "auth": {
            "channel": "local"
        },
        "execution": {
            "executor": "ipp",
            "provider": "sge",
            "script_dir": ".scripts",
            "scriptDir": ".scripts",
            "block": {
                "nodes": 1,
                "taskBlocks": 1,
                "walltime": "00:05:00",
                "initBlocks": 1,
                "minBlocks": 0,
                "maxBlocks": 10,
                "scriptDir": ".",
                "options": {
                    "partition": "debug"
                }
            }
        }
    }],
    "globals": {"lazyErrors": True},
    "controller": {"profile": "default"},
}

import os
os.environ['SGE_ROOT'] = '/local/cluster/sge'

dfk = DataFlowKernel(config=ipp_config)


In [3]:
# Define app for sys info
@App('python', dfk)
def sys_info():
    outputs = [{}]
    import platform
    outputs[0].update({"machine": platform.machine()})
    outputs[0].update({"platform": platform.platform()})
    outputs[0].update({"uname": platform.uname()})
    outputs[0].update({"processor": platform.processor()})
    outputs[0].update({"system": platform.system()})
    return outputs


def info_processor(inputs=[]):
    import json
    for lis in inputs:
        for dictio in lis:
            print(json.dumps(dictio, indent=4))

In [4]:
# Call worfklow

infos = [sys_info().result() for _ in range(50)]
info_processor(infos)

{
    "machine": "x86_64",
    "platform": "Linux-3.10.0-327.el7.x86_64-x86_64-with-centos-7.2.1511-Core",
    "uname": [
        "Linux",
        "bacon.blt.lclark.local",
        "3.10.0-327.el7.x86_64",
        "#1 SMP Thu Nov 19 22:10:57 UTC 2015",
        "x86_64",
        "x86_64"
    ],
    "processor": "x86_64",
    "system": "Linux"
}
{
    "machine": "x86_64",
    "platform": "Linux-3.10.0-327.el7.x86_64-x86_64-with-centos-7.2.1511-Core",
    "uname": [
        "Linux",
        "bacon.blt.lclark.local",
        "3.10.0-327.el7.x86_64",
        "#1 SMP Thu Nov 19 22:10:57 UTC 2015",
        "x86_64",
        "x86_64"
    ],
    "processor": "x86_64",
    "system": "Linux"
}
{
    "machine": "x86_64",
    "platform": "Linux-3.10.0-327.el7.x86_64-x86_64-with-centos-7.2.1511-Core",
    "uname": [
        "Linux",
        "bacon.blt.lclark.local",
        "3.10.0-327.el7.x86_64",
        "#1 SMP Thu Nov 19 22:10:57 UTC 2015",
        "x86_64",
        "x86_64"
    ],
    "processo

## III. Messages Between Systems
As we mentioned before, it is extremely important for processes to be able to communicate with each other. This is to avoid situations where either parts of a workflow get run too many times or not enough times. Situations like this mean that the workflow is either broken or not performing as fast as it could be. This image shows a distributed system in which processes can communicate over the Internet. ![inter process communication grid](https://upload.wikimedia.org/wikipedia/commons/e/ef/ArchitectureCloudLinksSameSite.png)

So, what exactly is interprocess communication? There are many situations where having processes communicate can increase speed, convenience and modularity. Inter process communication is a mechanism which allows processes to communicate each other and synchronize their actions. The communication between these processes can be seen as a method of cooperation between them. Processes can communicate in a number of ways, including shared memory, through files, through pipes, with signaling, over network sockets, and when programmed very specifically, through an incredibly fast and powerful API called MPI - the Message Passing Interface. We don't get into MPI too much in this  course, but it is a very useful skill to learn if you want to be serious about HPC. 

So, hopefully I've convinced you of the usefulness of inter-process communication, and now I can tell you a bit more about how they work. In the simplest example, shared memory, two (or more) processes can simple use the same data. This (obviously) only works when the two processes are on the same computer with access to the same memory pool. This is how the `multiprocessing` module which we have been using works. Sometimes, processes can write data to files. Then, with proper permissions, any other compute core on the network can reach information from that process. The main downside of that is that both network IO and file IO are very slow and will cause processes to idle. Another option is to have a server-client relationship. This is how Parsl works. The Parsl host (the computer running the Parsl script) acts as a server, telling each client, or worker, what it needs to do. This way, the inter process communication all comes from one server. In the image below, this represents a "scatter" or "broadcast" type of communication. Then, at the end of the work, Parsl requests the data back from all of the worker machines, representing a "reduction" or a "gather" method of communication. Server-client communication can often combine the speed of message passing, to an extent, with the ease of other forms of inter process communication. This is why we're going to primarily use server-client communication through Parsl in this course.

![Message Passing Diagram](https://computing.llnl.gov/tutorials/mpi/images/collective_comm.gif)


### Sidenote on MPI

I mentioned MPI earlier, and it's a really important topic in HPC, so I think I should at least explain a little about what it is. MPI is a standardized and portable message-passing interface designed by a group of researchers from academia and industry to function on a wide variety of parallel computing architectures. It's designed to be able to ensure that processes can share data, metadata, and other related information with each other. Some example uses of this are making sure that processes are synchronized, providing work to specific processes, and ensuring that the right data ends up in the right places. MPI has a heirarchical organization of processes, where tasks with a higher priority can give instructions to tasks with a lower priority. While MPI is a bit out of the scope of this course, it's a really good skill to have if you want to do HPC in the real world.

### Example 5.3 - Parsl IPyParallel Tasks
As we mentioned before, though Parsl supports MPI through one of its executors, the way we are using it is in the client-server communication model. Parsl uses IPyParallel as a backend for this client-server behavior. IPyParallel is a way to start remote IPython kernels and run Python code through them on remote machines. Using IPyParallel, Parsl is able to create a virtual cluster within our physical cluster and push and pull work and data to each virtual node. Each virtual node takes up one core of the physical machine, and so we can make a large virtual cluster if we need to. In this example, we're going to do some distributed work across the computer. We're going to perform a basic parallel dataflow through Parsl. 

In [6]:
# Parsl config
from parsl import *
import logging

ipp_config = {
    "sites": [{
        "site": "LC_Cluster",
        "auth": {
            "channel": "local"
        },
        "execution": {
            "executor": "ipp",
            "provider": "sge",
            "script_dir": ".scripts",
            "scriptDir": ".scripts",
            "block": {
                "nodes": 1,
                "taskBlocks": 1,
                "walltime": "00:05:00",
                "initBlocks": 1,
                "minBlocks": 0,
                "maxBlocks": 10,
                "scriptDir": ".",
                "options": {
                    "partition": "debug"
                }
            }
        }
    }],
    "globals": {"lazyErrors": True},
    "controller": {"profile": "default"},
}

import os
os.environ['SGE_ROOT'] = '/local/cluster/sge'

dfk = DataFlowKernel(config=ipp_config)


In [14]:
# App that generates a random number
@App('bash', dfk)
def generate(outputs=[]):
    return "echo $(( RANDOM )) &> {outputs[0]}"

# App that concatenates input files into a single output file
@App('bash', dfk)
def concat(inputs=[], outputs=[], stdout="data/stdout.txt", stderr='data/stderr.txt'):
    return "cat {0} > {1}".format(" ".join(inputs), outputs[0])

# App that calculates the sum of values in a list of input files
@App('python', dfk)
def total(inputs=[]):
    total = 0
    with open(inputs[0], 'r') as f:
        for l in f:
            total += int(l)
    return total

# Create 5 files with random numbers
output_files = []
for i in range (5):
     output_files.append(generate(outputs=['data/random-%s.txt' % i]))

# Concatenate the files into a single file
cc = concat(inputs=[i.outputs[0] for i in output_files], outputs=["data/all.txt"])

# Calculate the sum of the random numbers
total = total(inputs=[cc.outputs[0]])
print (total.result())

79838


## IV. Distributed Dataflows
When we introduced dataflows before, we mentioned that they are often parallelizable as different "black boxes" can run on different cores without any interaction, and without different black boxes blocking each other. In general, but not always, this is also true of dataflows in terms of distributability. If a black box in a dataflow is completely independent of another black box, you can not only parallelize them, but also you can distribute them and run them on different nodes of a machine.

Consider the workflow we were working with in Ch. 4: ![heirarchical workflow](https://i.imgur.com/8s1CkQ2.png)

Because of the way the arrows are set up, you can see that any black box in the same "layer" as another can be run in parallel, as we discussed earlier. In addition, because they are not dependent on each other, they can be easily distributed. The rng_add phase must be run after the rng section, but it can be distributed as well.

### Example 5.4 - Doing Distributed Math With Parsl
In this example, we're going to use Parsl to distribute the workflow from Ch. 4. It is still true that this workflow is not really that useful for anything, but it's an interesting learning exercise, and as I said before, the hard part of writing HPC workflows is making the different bits of the workflow interact, and not designing each bit on its own. As an exercise, you could easily replace the randomization bits of this with any other application.

In [15]:
# Parsl config
from parsl import *
import logging

ipp_config = {
    "sites": [{
        "site": "LC_Cluster",
        "auth": {
            "channel": "local"
        },
        "execution": {
            "executor": "ipp",
            "provider": "sge",
            "script_dir": ".scripts",
            "scriptDir": ".scripts",
            "block": {
                "nodes": 1,
                "taskBlocks": 1,
                "walltime": "00:05:00",
                "initBlocks": 1,
                "minBlocks": 0,
                "maxBlocks": 10,
                "scriptDir": ".",
                "options": {
                    "partition": "debug"
                }
            }
        }
    }],
    "globals": {"lazyErrors": True},
    "controller": {"profile": "default"},
}

import os
os.environ['SGE_ROOT'] = '/local/cluster/sge'

dfk = DataFlowKernel(config=ipp_config)


In [16]:
# App that generates a random number
@App('bash', dfk)
def generate(outputs=[]):
    return "echo $(( RANDOM )) &> {outputs[0]}"

# App that concatenates input files into a single output file
@App('bash', dfk)
def concat(inputs=[], outputs=[], stdout="data/stdout.txt", stderr='data/stderr.txt'):
    return "cat {0} > {1}".format(" ".join(inputs), outputs[0])

# App that calculates the sum of values in a list of input files
@App('python', dfk)
def total(inputs=[]):
    total = 0
    with open(inputs[0], 'r') as f:
        for l in f:
            total += int(l)
    return total

@App('python', dfk)
def rng_add(add_const, inputs=[]):
    total = 0
    lines = []
    with open(inputs[0], 'r') as f:
        lines = file.readlines()
        for line in lines:
            line = str(int(line) + add_const)
    with open(inputs[0], 'w') as f:
        f.writelines(lines)

# Create 5 files with random numbers
output_files = []
for i in range (5):
     output_files.append(generate(outputs=['data/random-%s.txt' % i]))

# Concatenate the files into a single file
cc = concat(inputs=[i.outputs[0] for i in output_files], outputs=["data/all.txt"])

# Calculate the sum of the random numbers
rng_add(200, inputs=[cc.outputs[0]])

total = total(inputs=[cc.outputs[0]])
print (total.result())

86919


## V. Large Scale Problems
Large scale HPC problems often involve more than Python code. Luckily, that other code is usually callable from the UNIX command line. Parsl supports calling arbitrary "black box" applications over the command line through bash apps.  Parsl’s Bash app allows you to wrap execution of external applications from the command-line as you would in a Bash shell. It can also be used to execute Bash scripts directly. To define a Bash app the wrapped Python function must return the command-line string to be executed. Any command-line invocation represented by an arbitrarily long string, can be returned by a function decorated within an `@App` of type bash to be executed. Unlike the Python app, Bash apps communicate by passing files. The decorated bash function provides the same special keyword arguments to manage input and output files.

### Example 5.5 - Basic HPC Dataflow
In this example, we're going to use these bash apps to create a simulated HPC workflow calling things from the command line. In a Parsl bash app function, there are a few special reserved keyword arguments:

- inputs (List) : A list of strings or DataFutures
- outputs (List) : A list of output file paths
- stdout (str) : redirects STDOUT to string filename
- stderr (str) : redirects STDERR to string filename

In addition, if a list of output filenames are provided via the outputs=[], a list of futures corresponding to each filename in the outputs list is made available via the outputs attribute of the app.


In [None]:
# Parsl config
from parsl import *
import logging

ipp_config = {
    "sites": [{
        "site": "LC_Cluster",
        "auth": {
            "channel": "local"
        },
        "execution": {
            "executor": "ipp",
            "provider": "sge",
            "script_dir": ".scripts",
            "scriptDir": ".scripts",
            "block": {
                "nodes": 1,
                "taskBlocks": 1,
                "walltime": "00:05:00",
                "initBlocks": 1,
                "minBlocks": 0,
                "maxBlocks": 10,
                "scriptDir": ".",
                "options": {
                    "partition": "debug"
                }
            }
        }
    }],
    "globals": {"lazyErrors": True},
    "controller": {"profile": "default"},
}

import os
os.environ['SGE_ROOT'] = '/local/cluster/sge'

dfk = DataFlowKernel(config=ipp_config)


In [18]:
@App('bash', dfk)
def sort(unsorted, outputs=[]):
    """Call sort executable on file `unsorted`"""
    return "sort -g {} > {}".format(unsorted, outputs[0])

s = sort(os.path.abspath("data/unsorted.txt"), 
         outputs=[os.path.abspath("data/sorted_c.txt")])

output_file = s.outputs[0].result()

print("Contents of the unsorted.txt file:")
with open('data/unsorted.txt', 'r') as f:
    print(f.read().replace("\n",","))
    
print("\nContents of the sorted output file:")
with open(output_file, 'r') as f:
    print(f.read().replace("\n",","))

Contents of the unsorted.txt file:
36,40,87,34,97,50,39,11,11,57,64,49,76,67,89,20,75,80,82,98,10,73,50,70,8,41,75,1,7,55,4,9,6,99,69,75,81,67,58,47,32,77,13,32,64,28,89,78,27,68,11,2,65,70,75,47,31,96,7,32,20,84,3,92,33,96,81,61,43,10,13,16,64,56,89,39,81,4,31,6,93,14,34,47,27,23,56,49,71,85,56,65,68,16,11,54,10,28,64,97,

Contents of the sorted output file:
1,2,3,4,4,6,6,7,7,8,9,10,10,10,11,11,11,11,13,13,14,16,16,20,20,23,27,27,28,28,31,31,32,32,32,33,34,34,36,39,39,40,41,43,47,47,47,49,49,50,50,54,55,56,56,56,57,58,61,64,64,64,64,65,65,67,67,68,68,69,70,70,71,73,75,75,75,75,76,77,78,80,81,81,81,82,84,85,87,89,89,89,92,93,96,96,97,97,98,99,


## Exercise 5. Real Live Workflow!

In [4]:
# Your code goes here