In [3]:
%run ../../python-seminar/DataFiles_and_Notebooks/talktools.py

# Parallelism

Python Computing for Data Science (AY250)

## Outline for Today

- Motivation

- Single-machine
    - threading
    - multiprocessing
    - joblib, jax
- Multimachine:
    - dask
    - ray

## Motivation

Generally, the goal of your computing task is to finish as quickly as possible. The **speed of your processor** at executing instructions and the **speed at which data can be read from disk and from RAM** are major contributors to the execution time. Obviously the **choice of algorithm(s)** is critical too. Choosing an $N \log N$ algorithm over a $N^2$ one that gets the same answer is almost always preferred for any sizeable $N$.

### Types of Bottlenecks

If you think of your run-time program as stream of data and computation on data, it should be clear that **bottlenecks are inevitable**. Your job (as you begin optimize for execution time) is to understand where those bottlenecks are and to use the tools we have in Python to minimize those. (Ultimately, it's a never-ending whack-a-mole).

#### I/O Bound

* "a condition in which the time it takes to complete a computation is determined principally by the period spent waiting for input/output operations to be completed." * -- wikipedia

This can be because we're waiting for a response from an external source (e.g. loading a webpage) or because data needs to be moved around on your bus and we're waiting for it to show up in the right place to compute on. If you have very fast CPUs, you're more likely to be I/O bound.

#### CPU Bound

*"when the time for it to complete a task is determined principally by the speed of the central processor: processor utilization is high, perhaps at 100% usage for many seconds or minutes."* -- wikipedia

If you're doing  algorithmic computations where the amount if input data is small and the amount of output data is also small (e.g. fournier transform) you'll typically be CPU bound. Slowed CPUs lead to more CPU bound bottlenecks. If you have a lot of data ("big") you're moving data around from disk, RAM, cache and you're likely I/O bound.

#### (Memory Bound)

"time to complete a given computational problem is decided primarily by the amount of memory required to hold data" - wikipedia.

<img src="https://www.evernote.com/l/AUUzntxvU9BHWJMZSH_CL3S7YRUjThJTrPEB/image.png">

Source: http://www.slideshare.net/ManojitNandi/parallel-programming-in-python-speeding-up-your-analysis

# Processes & Threads

Each Python interpreter runs in a `process,` containing the program code, stack, and its current activity. 

In [4]:
import os
os.getpid()

66181

Within a process one can create a set of `threads` which share everything with the process in which they were spawned (memory, data, state). But, most generally, they are little programs (with their own stack) that execute `concurrently` (independent of each other). Since they share things like memory, it requires the programmer to "lock" everything that might conflict. The way we make many threads in Python is using the `threading` module.

<div class="alert alert-info">The Global Interpreter Lock (GIL) in Python stops threads from truly happening in parallel. That is, the interpreter can only operate one thread at a time. This is an impliementation detail of how CPython was programmed. Many things you use push threads down into the C-layer and "avoid the GIL". </div>

You can also make many processes, which are copies of the original parent process (memory, data, state) and act independently of each other. To share data between them you have to explicitly do that within each process. The Pythonic way we do multiprocessing (creation of new processes, communication between processes) is with `multiprocessing`.

The goal of computing with `threading` and `multiprocessing` is to not wait around: the CPU should not be idle if it doesn't have too. AND since we almost always have multiple cores, we should be able to let the work we want to do happen in parallel over those cores.

`you can start a wayyy more processes than the number of cores but it doesn't mean they'll all be running simultaneously. E.g., if you have a 4-core machine and you run 4,000 processes, 3,996 processes will be haning out up in memory and the other four will be working. One one process is done, that core will throw the process back up into memory and pull down a new one to work on, etc until done`

### Threading

`threading.Thread(target=f, args=(...))` is the basic way to use function `f` with arguments in a thread.

`.start()`: Calls the `.run()` of a thread object. This method will raise a `RuntimeError` if called more than once on the same thread object.

`Threading is only helpful when you're not CPU-bound. If you have a thread waiting around (e.g. because you're accessing the internet and waiting for download, or in this case, if you have a thread hanging because time.sleep()) then that thread will release the GIL and the CPU can be used for the next thread.`

In [5]:
import threading

def worker(num):
    """thread worker function"""
    print('Worker: %s' % num)
    return

threads = []
for i in range(5):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()
threads

Worker: 0
Worker: 1Worker: 2

Worker: 3
Worker: 4


[<Thread(Thread-8, stopped 13196681216)>,
 <Thread(Thread-9, stopped 13213470720)>,
 <Thread(Thread-10, stopped 13196681216)>,
 <Thread(Thread-11, stopped 13196681216)>,
 <Thread(Thread-12, stopped 13196681216)>]

In [6]:
%%time
threads = []
for i in range(5):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

Worker: 0
Worker: 1
Worker: 2
Worker: 3
Worker: 4CPU times: user 4.11 ms, sys: 3.69 ms, total: 7.8 ms

Wall time: 9.49 ms


Despite the GIL, threads wont get in each other's way if they are idle.

In [7]:
#%%time
import logging
import random
import time

root = logging.getLogger()
root.handlers = []
logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-9s) %(message)s',)

import threading

def worker(num):
    """thread worker function"""
    
    sleep_time = random.randint(1,5)
    logging.debug('worker: {0} sleeping for {1} s, name: {2}'
                   .format(num,sleep_time,threading.current_thread().getName()))
    time.sleep(sleep_time)
    logging.debug('done')
    return

threads = []
for i in range(5):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

(Thread-18) worker: 0 sleeping for 3 s, name: Thread-18
(Thread-19) worker: 1 sleeping for 5 s, name: Thread-19
(Thread-20) worker: 2 sleeping for 3 s, name: Thread-20
(Thread-21) worker: 3 sleeping for 5 s, name: Thread-21
(Thread-22) worker: 4 sleeping for 3 s, name: Thread-22
(Thread-18) done
(Thread-20) done
(Thread-22) done
(Thread-19) done
(Thread-21) done


More on logging: https://docs.python.org/3/howto/logging-cookbook.html


`.join()`

In [8]:
t.is_alive()

False

In [9]:
%%time
# not very parallel ... 
threads = []
for i in range(2):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()
    t.join() # this waits for the thread to finish

(Thread-23) worker: 0 sleeping for 5 s, name: Thread-23
(Thread-23) done
(Thread-24) worker: 1 sleeping for 4 s, name: Thread-24
(Thread-24) done


CPU times: user 14.9 ms, sys: 5.11 ms, total: 20 ms
Wall time: 9.02 s


In [11]:
%%time
threads = []
for i in range(2):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)

print("waiting around a bit, then starting threads",flush=True)
# time.sleep(2)

# dont have to start a thread immediately after creating them
for t in threads:
    t.start() 

for t in threads:
    t.join() # this waits for the thread to finish

print("I'm really done with all the threads.")

waiting around a bit, then starting threads


(Thread-27) worker: 0 sleeping for 4 s, name: Thread-27
(Thread-28) worker: 1 sleeping for 5 s, name: Thread-28
(Thread-27) done
(Thread-28) done


I'm really done with all the threads.
CPU times: user 15.6 ms, sys: 5.69 ms, total: 21.3 ms
Wall time: 5.01 s


A few things:

- `logging` is "thread-safe" -- so different threads can write to the log file without causing issues
- you can always get a handle to the current thread with `threading.current_thread()`

You can delay the start of the execution of a thread with `Timer`

```python
threading.Timer(interval, function, args=None, kwargs=None)
```

In [12]:
threads = []
for i in range(2):
    r = random.randint(1,5)
    t = threading.Timer(r, worker, args=(i,))
    threads.append(t)
    logging.debug(f"starting {t.getName()} with delay {r}")
    threads[-1].start()

(MainThread) starting Thread-29 with delay 2
(MainThread) starting Thread-30 with delay 3
(Thread-29) worker: 0 sleeping for 2 s, name: Thread-29
(Thread-30) worker: 1 sleeping for 4 s, name: Thread-30
(Thread-29) done
(Thread-30) done


You can share variables (safely) between threads with a `queue`:

In [13]:
from queue import Queue

q = Queue()

def worker2(num):
    sleep_time = random.randint(1,5)
    
    logging.debug('worker: {0} sleeping for {1} s, name: {2}'
                   .format(num,sleep_time,threading.current_thread().getName()))
    # do some real work
    time.sleep(sleep_time)

    if q.empty():
        q.put(sleep_time)
        logging.debug(f"initiated q = {sleep_time}")
    else:
        var = q.get()
        logging.debug(f"var {var}")
        q.put(sleep_time + var)
        logging.debug(f"added {sleep_time} to the q")
        
    logging.debug('done')
    return

threads = []
for i in range(2):
    t = threading.Thread(target=worker2, args=(i,))
    threads.append(t)
    t.start()

(Thread-31) worker: 0 sleeping for 4 s, name: Thread-31
(Thread-32) worker: 1 sleeping for 1 s, name: Thread-32
(Thread-32) initiated q = 1
(Thread-32) done
(Thread-31) var 1
(Thread-31) added 4 to the q
(Thread-31) done


In [14]:
q.get()

5

Threads can also signal each other with `Event` and can thresholds for the numbers of finished threads can be created with `Barrier`. There are low-level primiatives (pushed to the UNIX \_pthreads level) called `locks` and `semaphores` that we'll not bother with here.

Threading can be done with objects. You can subclass `threading.Thread` and create your own threads that know how to run.

In [15]:
os.popen("ping -q -c2 google.com","r").readlines()

['PING google.com (142.250.191.78): 56 data bytes\n',
 '\n',
 '--- google.com ping statistics ---\n',
 '2 packets transmitted, 2 packets received, 0.0% packet loss\n',
 'round-trip min/avg/max/stddev = 7.411/7.483/7.556/0.073 ms\n']

In [23]:
# adapted from http://www.python-course.eu/threads.php
import os, re, threading

# mac
received_packages = re.compile(r"(\d).*received")


class ip_check(threading.Thread):
  
    def __init__ (self, ip):
        threading.Thread.__init__(self)
        self.ip = ip
        self._successful_pings = -1
   
    def run(self):
        ping_out = os.popen("ping -q -c2 " + self.ip, "r")
        while True:
            lines = ping_out.readlines()
            if not lines or len(lines) < 3: 
                break
            n_received = re.findall(received_packages, lines[3])
            if n_received:
                self._successful_pings = int(n_received[0])

    def status(self):
        if self._successful_pings == 0:
             return "has no response"
        elif self._successful_pings == 1:
             return "is alive, but 50 % package loss"
        elif self._successful_pings == 2:
             return "is alive"
        else:
             return "not reachable"

check_results = []
for ip in ["google.com", "slashdot.com", "berkeley.edu", "usa-curling.gov"]:
    current = ip_check(ip)
    check_results.append(current)
    current.start()

for el in check_results:
    el.join()
    print("Status of", el.ip,el.status())

ping: cannot resolve usa-curling.gov: Unknown host


Status of google.com is alive
Status of slashdot.com is alive
Status of berkeley.edu is alive
Status of usa-curling.gov not reachable


In [24]:
check_results[0]._successful_pings

2

### Breakout

Using threading, grab the titles of random 10 wikipedia webpages using https://en.wikipedia.org/wiki/Special:Random. Count the total number of characters returned over all 10 pages.

Hint: Use `requests` and `BeautifulSoup` to get and parse the data:

```python

import requests
from bs4 import BeautifulSoup
...
resp = requests.get(url).text
title = BeautifulSoup(resp, 'html.parser').title.string
```        

In [50]:
%%time
import requests
from bs4 import BeautifulSoup
import logging
import random
import time
import threading
import numpy as np

root = logging.getLogger()
root.handlers = []
logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-9s) %(message)s',)


url = 'https://en.wikipedia.org/wiki/Special:Random'
lengths = []
def worker():
    """thread worker function"""
    
    resp = requests.get(url).text
    title = BeautifulSoup(resp, 'html.parser').title.string.split('- Wikipedia')[0]
    print(title)
    lengths.append(len(title))

threads = []
for i in range(10):
    t = threading.Thread(target=worker)
    threads.append(t)
    t.start()
    
for t in threads:
    t.join()
    
# time.sleep(1)
    
    
cumlength = np.sum(np.array(lengths))
print(f'Total lengths = {cumlength}')

(Thread-177) Starting new HTTPS connection (1): en.wikipedia.org:443
(Thread-178) Starting new HTTPS connection (1): en.wikipedia.org:443
(Thread-176) Starting new HTTPS connection (1): en.wikipedia.org:443
(Thread-179) Starting new HTTPS connection (1): en.wikipedia.org:443
(Thread-180) Starting new HTTPS connection (1): en.wikipedia.org:443
(Thread-185) Starting new HTTPS connection (1): en.wikipedia.org:443
(Thread-184) Starting new HTTPS connection (1): en.wikipedia.org:443
(Thread-181) Starting new HTTPS connection (1): en.wikipedia.org:443
(Thread-182) Starting new HTTPS connection (1): en.wikipedia.org:443
(Thread-183) Starting new HTTPS connection (1): en.wikipedia.org:443
(Thread-177) https://en.wikipedia.org:443 "GET /wiki/Special:Random HTTP/1.1" 302 0
(Thread-176) https://en.wikipedia.org:443 "GET /wiki/Special:Random HTTP/1.1" 302 0
(Thread-179) https://en.wikipedia.org:443 "GET /wiki/Special:Random HTTP/1.1" 302 0
(Thread-180) https://en.wikipedia.org:443 "GET /wiki/Speci

Aliculastrum cylindricum 


(Thread-185) https://en.wikipedia.org:443 "GET /wiki/Ripley_County_Courthouse_(Missouri) HTTP/1.1" 200 None
(Thread-182) https://en.wikipedia.org:443 "GET /wiki/Edward_Clarke_Cabot HTTP/1.1" 200 None


1961 Windward Islands Tournament Sasha (German singer) discography 

Zhuchengtitan 
Video-based reflection 
Ripley County Courthouse (Missouri) 
Edward Clarke Cabot 


(Thread-184) https://en.wikipedia.org:443 "GET /wiki/Brian_Law HTTP/1.1" 200 None
(Thread-179) https://en.wikipedia.org:443 "GET /wiki/Sebasti%C3%A1n_Silva_(entertainer) HTTP/1.1" 200 None
(Thread-181) https://en.wikipedia.org:443 "GET /wiki/Chminianske_Jakubovany HTTP/1.1" 200 None


Brian Law 
Sebastián Silva (entertainer) 
Chminianske Jakubovany 
Total lengths = 248
CPU times: user 558 ms, sys: 65 ms, total: 623 ms
Wall time: 1.22 s


Aside: for asynchronous I/O tasks you might consider using an event loop. Use the built in `asyncio` and, for gathering webpages, use `aiohttp` (http://aiohttp.readthedocs.io/en/stable/).

In [None]:
#!pip install aiohttp

In [None]:
%%writefile async_fetch.py
import asyncio
from aiohttp import ClientSession
from bs4 import BeautifulSoup

async def fetch(url, session):
    async with session.get(url) as response:
        return await response.read()

async def run(loop,  r):
    url = "https://en.wikipedia.org/wiki/Special:Random"
    tasks = []

    # Fetch all responses within one Client session,
    # keep connection alive for all requests.
    async with ClientSession() as session:
        for i in range(r):
            task = asyncio.ensure_future(fetch(url, session))
            tasks.append(task)

        responses = await asyncio.gather(*tasks)
        # you now have all response bodies in this variable
    
    for resp in responses:
        print("title=",BeautifulSoup(resp, 'html.parser')
              .title.string.split("- Wikipedia")[0],"len=",len(resp))

loop = asyncio.get_event_loop()
future = asyncio.ensure_future(run(loop, 4))
loop.run_until_complete(future)

In [None]:
!python async_fetch.py

This script will not work inside of Jupyter notebooks because it's running it's own event loop!

In [None]:
asyncio.get_event_loop()