# Distributed Computation

## Quick Recap - Multiprocessing vs. Multithreading

Multiprocessing can utilize multiple CPU cores, thus achieving a more real sense of a parallel computation. However, multiprocessing suffers when they need to share a common memory space.

On the other hand, multithreading within a process can share a common memory space, and achieve a more loose sense of a parallel computation. Multithreading is more like hyper-looping through multiple queues while waiting for each particular thread's turn; and when it is a particular thread's turn, it would acquire the Global Interpreter Lock (GIL) to hog the memory space _and_ the CPU core the entire process runs on, until it finishes its designated computation or until it hits another "busy-waiting" block such as some I/O action, where the loop releases the GIL and let the jump proceed (context-switch) to the next thread.

### Asynchronous I/O Loops, and Multithreading's Little Brother - Coroutines

Both multiprocessing and multithreading require dedicated hardware and operating system coordination. As software technologies mature, engineers started to explore capabilities within the application layer itself. The same conceptual model of multithreading (again, the hyper-looping) gets a new interpretation within programming stack (such as Python, or more appropriately the CPython runtime), giving more direct control to the program within the runtime instead of relying on the OS mechanism to switch context between threads. The term _Coroutine_, first coined in 1958, is such a materialization of the concept that can be seen as lightweight threads, and has seen its implementation in many languages including Python. Its implementation usually involves dedicated asynchronous I/O loops (or event loops) to switch between the coroutines with more direct controls than threading on when to suspend and resume each task.

Both multithreading and coroutine techniques are suitable for I/O focused tasks, such as reading files from a disk, or making HTTP requests. Coroutines tend to require less computing resource overhead compared to its more OS-native sibling.

In [13]:
import random
import requests


# 5 requests, each with delays ranging from 1-5 seconds
reqs = [
    f'https://httpbin.org/delay/{random.randint(1, 3)}'
    for _ in range(10)
]

def get_sync():
    all_data = []
    for req in reqs:
        res = requests.get(req)
        all_data.append(res.json())

    return all_data

In [14]:
%time res = get_sync()
print(res[-1], len(res))

CPU times: user 163 ms, sys: 12.9 ms, total: 176 ms
Wall time: 23.6 s
{'args': {}, 'data': '', 'files': {}, 'form': {}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Host': 'httpbin.org', 'User-Agent': 'python-requests/2.25.1', 'X-Amzn-Trace-Id': 'Root=1-60d10ac9-550eb0d6203262b369bd3576'}, 'origin': '107.179.188.69', 'url': 'https://httpbin.org/delay/2'} 10


In [15]:
import asyncio

import aiohttp


async def get(session, url):
    res = await session.request('GET', url=url)
    data = await res.json()
    return data


async def main():
    async with aiohttp.ClientSession() as session:
        tasks = []
        for req in reqs:
            tasks.append(get(session, url=req))

        all_data = await asyncio.gather(*tasks, return_exceptions=True)
        return all_data

In [16]:
'''
Two things about Jupyter Notebook environment
1. Cannot use %time magic for async functions
2. No explicit event loop initiation (it's already in one)
'''
import time

# loop = asyncio.get_event_loop()
# loop.run_until_complete(main())
start = time.time()
res = await main()
print(round(time.time() - start, 1), 'seconds')
print(res[-1], len(res))

3.2 seconds
{'args': {}, 'data': '', 'files': {}, 'form': {}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Host': 'httpbin.org', 'User-Agent': 'Python/3.8 aiohttp/3.7.4.post0', 'X-Amzn-Trace-Id': 'Root=1-60d10acc-6bdccf035163f7cc14d45527'}, 'origin': '107.179.188.69', 'url': 'https://httpbin.org/delay/2'} 10


Let's use the responses to verify what the delays were involved in the API calls, by using functional techniques `map()` and `reduce()` to extract delays (in seconds) and sum them up.

In [17]:
delays = [*map(lambda url: int(url.split('/')[-1]), reqs)]
delays

[1, 1, 1, 2, 3, 3, 2, 3, 2, 2]

In [18]:
from functools import reduce

total_delay = reduce(lambda left, right: left + right, [*delays], 0)
total_delay

20

The theoretical total delay matches our observation from the synchronous process, while the _maximum_ from individual delays matches our observation from the asynchronous process.

## MapReduce - Distributing Computing Resources

Recall one of the most critical disadvantages that multiprocessing - the lack of a shared memory space. This particular constraint makes attempt to perform parrallel computation across multiple cores on a single machine often less desirable. One workaround of such issue is to leverage a datastore (such as files or databases backed by harddrives) as an inter-process data pool so that multiple processes can simultaneously read and write to it, such as a local SQLite database, as demonstrated in [a previous part](./11-work-with-sql.ipynb).

If we extend this problem to a larger scale, where not only the dataset we want to work with exceed the available memory space, but also impossible to efficiently store (or at all) on the disk of a single machine, then we need to revisit viable solutions.

The MapReduce model is a _divide and conquer_ strategy applying and extending the functional programming concepts of `map()` and `reduce()`, where a large dataset is dissected and distributed through a mapping procedure onto a multitude of "commodity" server nodes to parrallize the computation of the smaller portion, then reducing the resulting subsets back to less and less nodes until the final outcome is completed.

This model allows batch data processing to have near infinite capacity, and a relatively cost-effective way to speed up the process.

It is worth knowing that the recent development of cheaper and faster harddrives, especially the more wide-spread adoption of SSDs (solid-state drive), plays a vital role in enabling distributed computation.

The MapReduce model was first pioneered [by Google](https://research.google/pubs/pub62/) in 2004 to resolve the practical problem of exponentially growing dataset for computing their search indexes. Then many have adopted and contributed toward the technology's development and evolution through the open-source community.

### Apache Spark™

Apache Spark is such an open-source framework that came around 2014 (10 years after the initial MapReduce research paper) that provides an elegant and unified abstraction to enable large-scale data processing that can efficiently utilize from multiple-cores of a single machine to "multiple-clouds".

For the sake of simplicity, we will demonstrate through its usage on a single machine. First, obtain the number of CPU cores:

In [1]:
import multiprocessing

CORES = multiprocessing.cpu_count()
CORES

12

Borrowing from [another previous part](./12-generate-data.ipynb), where we attempted to generate random and hashed device IDs:

In [2]:
# mostly from Part 12 - Generate data
from uuid import uuid4
from hashlib import sha1


def gen_device_ids(count: int = 20_000) -> list:
    device_ids = []
    for _ in range(count):
        device_ids.append(str(uuid4()))
    # hash
    return [sha1(x.encode()).hexdigest() for x in device_ids]

Let's generate a relatively large batch of device IDs, say 1 million times the number of `CORES`:

In [8]:
%time device_ids = gen_device_ids(count=1_000_000 * CORES)

CPU times: user 52.3 s, sys: 5.25 s, total: 57.5 s
Wall time: 57.7 s


Due to the iterative single process, the time it takes is quite long.

In [9]:
%time device_ids[:10]

CPU times: user 4 µs, sys: 6 µs, total: 10 µs
Wall time: 12.9 µs


['79e0a59c3636da4fc3dfbe1856224cf46e76a3bf',
 '26342eaf9e59e8ccf94e272d2bc37ace8e8fa603',
 '7698ecca8a088d35cc87f059bba69eaa2639b486',
 'af785b16f54542341b591fab8fc73283b481dff4',
 '0756f3791e74466205d241a8f5fcdba67a746bc8',
 '55e152016fe246f7d158428025d2a179ff44167f',
 'a7eefe3a2abbd91cf8fa68d587f340b91184c6d4',
 'a4d234f1f9fa2b3994f9160b762f5e7edadb8604',
 '529baede08b092c9496eb301052da9819419418f',
 '2dc7f022908dad5f2c717fad0d4c27cc141caef3']

But since the list is already in memory, it takes very little time to read them out.

Let's try the same device ID generation with Spark in a distributed manner. We will initiate a Spark session with a local "master" that takes advantage of the number of `CORES` we have obtained.

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master(f'local[{CORES}]').getOrCreate()
spark

The Spark UI which runs on the `localhost` is handy for monitoring and more.

![Spark UI](https://user-images.githubusercontent.com/2837532/122995953-46a9c700-d378-11eb-84a7-50917d34b7be.png)

The first approach involves a core concept and building block of Spark which is known as Resilient Distributed Datasets (`RDD`s).

In [4]:
%%time

def mapper(count):
    return [(d,) for d in gen_device_ids(count)]

# a list of [1_000_000, ..., 1_000_000], where the length is the number of CORES
counts = [1_000_000] * CORES

rdd = spark.sparkContext.parallelize(counts).flatMap(mapper)

CPU times: user 1.84 ms, sys: 1.99 ms, total: 3.82 ms
Wall time: 238 ms


The action takes a context from the available Spark session, schedule to parallelize over a list of 1 million counts, where the list length is the number of `CORES`. Then we instruct the parallelization to map the list over a `mapper` function that takes the individual count and generate a list of device IDs (in the form of a single element tuple). The `.flatMap` method is a convenient layer to ensure the resulting dataset is flattened as a single-tier list of device IDs, instead of a list of (number of `CORES`) lists.

Notice the time it takes to schedule is negligible, as Spark takes a _lazy_ approach to preserve computing resources until the computation is really needed.

In [6]:
%time rdd.take(10)

CPU times: user 3.76 ms, sys: 2.12 ms, total: 5.88 ms
Wall time: 5.08 s


[('79993454cc645c0c78569b8fe98b80cd9df4828f',),
 ('b164eaec7921780e336661d70215649de2a4eb09',),
 ('63d1760e28667a727e502b7a86c98bcfe25f7714',),
 ('54154a58b3d557dca36f9970d5d158ccc74a155f',),
 ('bb2e375ce4ffa9e7caa5ccf641e0bbc05147cb33',),
 ('8e2a7429109c13451ce9bfe55f6ce6878cf584b1',),
 ('d6d74a1717aa4dff0e7dd29976182c4b3d3d689e',),
 ('5e1010d089f18701dd96bccaae9d764290bc4a6b',),
 ('b9e5d165e2011bac5f848282e017c7c2cfcd59fc',),
 ('d6118fe0b844013c57416360c6418822a139a4ad',)]

Indeed, when we instruct to take the first 10 values from the RDD, Spark would actually start the computation (and more). If we go back to the Spark UI, it may paint a better picture on what Spark has done under the hood.

![RDD take](https://user-images.githubusercontent.com/2837532/122997519-16fbbe80-d37a-11eb-8704-5c520019e161.png)

Conceptually, the workflow is as the following:
1. Generate the device IDs leveraging multiple `CORES` in parallel.
2. Persist device IDs as RDD into distributed blocks on disk. They are also replicated automatically across available distributions to support further parallelized processes. Spark also records other necessary metadata such as order of values to ensure that computations that require strict ordering do not get affected.
3. Draw the first 10 values from the RDD files and populates into the Python process that runs this Notebook.

This means that while it is significantly faster to generate the device IDs compare to the single process iterative approach, it is also much slower to read it out as it involves a more complex trip to read from files while ensuring the order of values are intact.

Spark also has a built-in DataFrame implementation on top of the RDDs, providing neat abstractions such as SQL queries and more, similar to Pandas DataFrame.

In [7]:
%time df = rdd.toDF(['device_id'])

CPU times: user 192 ms, sys: 53.9 ms, total: 246 ms
Wall time: 7.04 s


This step reveals the trade-off more prominently, where the time it takes to operate on distributed dataset can be much more time consuming than its in-memory counterpart, where we can take the in-memory `list` of `device_ids` and convert it into a Pandas DataFrame:

In [11]:
import pandas as pd

%time pdf = pd.DataFrame(device_ids, columns=['device_id'])

pdf

CPU times: user 435 ms, sys: 26.8 ms, total: 462 ms
Wall time: 460 ms


Unnamed: 0,device_id
0,79e0a59c3636da4fc3dfbe1856224cf46e76a3bf
1,26342eaf9e59e8ccf94e272d2bc37ace8e8fa603
2,7698ecca8a088d35cc87f059bba69eaa2639b486
3,af785b16f54542341b591fab8fc73283b481dff4
4,0756f3791e74466205d241a8f5fcdba67a746bc8
...,...
11999995,e2e668530153491480fd72d202b7c1cfc67eef9d
11999996,e5cbb4ffaf8df7c646dea909adcecdaf3683ea3d
11999997,5ef67c9422a65985d5b15e838fe7898dc1f638c6
11999998,3171ae0274118bd36060f6d69be2beba6283d9f4


Bearing the same understanding, the Spark DataFrame would be slower to read, due to the round-trip it takes to the distributed disk blocks.

In [12]:
%time df.show()

+--------------------+
|           device_id|
+--------------------+
|ecfb6dcdf0bd44198...|
|5a1d3bfcb6cfc790a...|
|5438b4a97af6e1145...|
|5835e3a3659e2b7a8...|
|ed8503d8c602f0186...|
|65d960080f7fc74a4...|
|6ae28893e34a562bc...|
|5d3c7e681d93779f1...|
|c56a2de5130910ea1...|
|aea15eedf19160578...|
|c2a80b65a38f343a7...|
|f215008be624bb4b8...|
|7b03ce84ed2b2e7f1...|
|c9c7ed1893d923738...|
|7255f6af353da3b1b...|
|2cbcf97407d407a96...|
|e980b98cc073b8162...|
|7ae965b3d9e6f3671...|
|4794302bc6369338d...|
|eb31369d58bd79439...|
+--------------------+
only showing top 20 rows

CPU times: user 1.17 ms, sys: 1.15 ms, total: 2.32 ms
Wall time: 5.72 s


In [14]:
%time pdf.count()

CPU times: user 371 ms, sys: 8.01 ms, total: 379 ms
Wall time: 378 ms


device_id    12000000
dtype: int64

In [13]:
%time df.count()

CPU times: user 2.28 ms, sys: 1.8 ms, total: 4.08 ms
Wall time: 16.2 s


12000000

Taking the full count can be more significant of a difference. Spark would attempt to perform underlying optimization to not scan the full range of dataset when invoking partial readings such as `rdd.take()` or `df.show()`, but suffers a full-range scan when it needs to count the accurate number of items, hence the much longer duration.

Spark DataFrames can also be created directly, or through the help of concatinating existing Pandas DataFrames.

Below is an alternative approach to perform the device ID generation task by leveraging the robust support of Spark's native support to interface with Pandas DataFrame.

In [32]:
df = spark.createDataFrame([(i,) for i in range(CORES)], ['cluster'])

def _gen(df):
    device_ids = gen_device_ids(count=1_000_000)
    pdf = pd.DataFrame(device_ids, columns=['device_id'])
    pdf['cluster'] = df['cluster']
    return pdf.reset_index()

def gen_device_ids_udf(df):
    output = []
    for _, row in df.iterrows():
        pdf = _gen(df)
        output.append(pdf)

    return pd.concat(output)


schema = 'index long, cluster long, device_id string'
%time df = df.groupby('cluster').applyInPandas(gen_device_ids_udf, schema=schema).drop('cluster', 'index')

CPU times: user 47.9 ms, sys: 9.64 ms, total: 57.6 ms
Wall time: 286 ms


The similar parallelization principle from the RDD approach applies here, with a _hack_ around the mechanism of `.groupby()` which enables the parallelization against the number fo `CORES`.

Also similarly, the scheduling of the task does not take much, as we have yet to request for actual access to the data.

In [15]:
%time df.show()

+--------------------+
|           device_id|
+--------------------+
|5638653346e9ce267...|
|07f8429ab6091e2c9...|
|a2da510584b0458ed...|
|208c9da15db43f49f...|
|372146c8becb3d98e...|
|fe12fb491578ede7c...|
|11b48aa52f8270b31...|
|a49a66f75b9f9a337...|
|5ad8e84bd2eb1f078...|
|4d90b6f6b2464b593...|
|0b6776512b4375db1...|
|6b052b60b09248814...|
|b76fe3337f457ce6c...|
|786be5ba9c10f7505...|
|7e81fcc65b76b9fa9...|
|2c8ec661fce7d2923...|
|2ca0c05a27bc9de66...|
|7bf4786e3241d37ed...|
|c252efa35cf5b4045...|
|2bafd46820109a91f...|
+--------------------+
only showing top 20 rows

CPU times: user 1.14 ms, sys: 1.25 ms, total: 2.39 ms
Wall time: 5.79 s


Since this approach involves a more pronounced mapping process abstracted by the `.groupby` method, underlying workflow can be a bit more interesting to observe:

![group by](https://user-images.githubusercontent.com/2837532/123001100-5b895900-d37e-11eb-8b2a-db3fa40caaba.png)

In [16]:
%time df.count()

CPU times: user 2.35 ms, sys: 2.02 ms, total: 4.38 ms
Wall time: 18.1 s


12000000

Read access and the time consumption behaviours are within our expectations.

Let's attempt to perform some actual analysis of the overall dataset, such as counting the number of device IDs by their first characters.

We'll start with the Pandas DataFrame, which is in-memory.

In [23]:
pdf['first'] = pdf.device_id.astype(str).str[0]
pdf

Unnamed: 0,device_id,first
0,79e0a59c3636da4fc3dfbe1856224cf46e76a3bf,7
1,26342eaf9e59e8ccf94e272d2bc37ace8e8fa603,2
2,7698ecca8a088d35cc87f059bba69eaa2639b486,7
3,af785b16f54542341b591fab8fc73283b481dff4,a
4,0756f3791e74466205d241a8f5fcdba67a746bc8,0
...,...,...
11999995,e2e668530153491480fd72d202b7c1cfc67eef9d,e
11999996,e5cbb4ffaf8df7c646dea909adcecdaf3683ea3d,e
11999997,5ef67c9422a65985d5b15e838fe7898dc1f638c6,5
11999998,3171ae0274118bd36060f6d69be2beba6283d9f4,3


In [27]:
import time

size_time = []
size = len(pdf)

while size > 1:
    sample = pdf.sample(size)
    start = time.time()
    sample.groupby('first').agg({'device_id': 'count'})
    end = time.time()
    size_time.append({
        'size': size,
        'time': end - start,
    })
    size = size // 100

pdf_size_time = pd.DataFrame(size_time)
pdf_size_time

Unnamed: 0,size,time
0,12000000,2.448123
1,120000,0.029682
2,1200,0.002271
3,12,0.004389


Even in-memory, the time it takes to aggregate the counts is not negligible, and it would obviously get worse with larger size.

Depending on the available computing resources, this may vary. But after certain threashold, it exhibits a linear growth of time needed to perform the operation.

Now let's try with Spark.

In [24]:
df = df.withColumn('first', df.device_id.substr(0, 1))
df.show()

+--------------------+-----+
|           device_id|first|
+--------------------+-----+
|bcdee560470ae56a1...|    b|
|956f42d5039912ed8...|    9|
|19aeae4388b84b560...|    1|
|dbeaf78e828f91e91...|    d|
|5c034a5a889a23c33...|    5|
|b8da1f734f39e111b...|    b|
|b4ec1b6bb3f4f6e99...|    b|
|1d00c643689f121a3...|    1|
|e0b305ca7695c3639...|    e|
|c722e4d6182051df9...|    c|
|aef11a579141779b8...|    a|
|e0e317020994c8e66...|    e|
|46e7a05a28877f956...|    4|
|e38d8fbeca350bb14...|    e|
|d895f5ad34b04d20c...|    d|
|15db6050985fbbe2b...|    1|
|d9dc4114fe8e772a8...|    d|
|4021cb2d3c4bb893b...|    4|
|fb55cd99222542f5f...|    f|
|7dd4d2a488cc0127e...|    7|
+--------------------+-----+
only showing top 20 rows



From above examples, we already know that Spark DataFrames, due to an entirely different and more complex mechanism compared to the more direct in-memory model that Pandas employs, the sampling process may be more time consuming. Therefore the runtime of the entire while loop logic may take much longer, but the aggregation portion is captured precisely like its Pandas counterpart.

In [29]:
%%time

size_time = []
count = df.count()
size = count

while size > 1:
    sample = df.sample(size / count)
    _count = sample.count()
    start = time.time()
    sample.groupby('first').agg({'device_id': 'count'})
    end = time.time()
    size_time.append({
        'size': _count,
        'time': end - start,
    })
    size = size // 100

df_size_time = pd.DataFrame(size_time)
df_size_time

CPU times: user 14.5 ms, sys: 6.14 ms, total: 20.6 ms
Wall time: 1min 21s


Unnamed: 0,size,time
0,12000000,0.008463
1,119897,0.008105
2,1212,0.007846
3,19,0.008244


While it is significantly slower to bootstrap the samples, the aggregation shows a glimpse of Spark's true strength (or the distributed filesystem and the MapReduce model behind it). 

The time it takes to compute is not only much faster than its Pandas counterpart on larger sample sizes, but nearly uniform regardless of the given sample size. This scalability characteristic becomes more prominent, and also more essential as a tool, to process data when the sample size goes beyond a single machine's capacity, much like the situation that Google encountered in the early 2000s.

In [30]:
# locus example placeholder

## Remarks

On a single machine, depending on the tasks, we can leverage techniques such as multiprocessing, multithreading, or coroutines to leverage underlying computing resources to expedite processing time.

When the data size is much more than a single machine can handle, tools such as Apache Spark and Modin (and its underlying parallization abstractions Dask and Ray, all mentioned in [the previous part](./13-data-processing.ipynb)) become more essential to get the job done in a timely and cost-effective manner.

For instance, at EQ Works, we leverage a cloud provider managed Spark service to run through terabytes to petabytes of data with variance of computational complexities in hours or minutes which would otherwise take days or even months on a single machine, even if the said machine _can_ hold such amount of data reliably.

![EMR](https://user-images.githubusercontent.com/2837532/123005848-c89fed00-d384-11eb-8967-2e78faf718f2.png)

## References

* [Apache Spark](https://spark.apache.org/) and its [PySpark interface](https://spark.apache.org/docs/latest/api/python/index.html)
* [Python Coroutines and Tasks](https://docs.python.org/3/library/asyncio-task.html)
* [Part 11 - Work with SQL](./11-work-with-sql.ipynb)
* [Part 12 - Generate Data](./12-generate-data.ipynb)
* [Part 13 - The Power of Parallel Processing feat. multithreading and multiprocessing](./13-data-processing.ipynb)