In [1]:
import threading

In [2]:
import time

In [3]:
def countdown(n):
    for i in range(n):
        print(n - i - 1, "left")
        time.sleep(1)

In [4]:
t = threading.Thread(target=countdown, args=(5, ))

In [5]:
t.start()

4 left
3 left
2 left
1 left
0 left


In [7]:
class CountdownThread(threading.Thread):
    
    def __init__(self, n):
        super().__init__()
        self.n = n
        
    def run(self):  # called by method start
        for i in range(self.n):
            print(self.n - i - 1, "left")
            time.sleep(1)

In [8]:
t = CountdownThread(3)

In [9]:
t.start()

2 left
1 left
0 left


Use class (no inheritance from `Thread`) with call method to run it as function in `threading.Thread` and keep its states within

In [10]:
threading.Thread().name

'Thread-6'

In [11]:
threading.Thread(name="NumberCruncher").name

'NumberCruncher'

In [12]:
t = threading.Thread()

In [13]:
t.start()

In [14]:
t.ident

140618249824000

In [15]:
threading.enumerate()

[<_MainThread(MainThread, started 140618361730880)>,
 <Thread(Thread-2, started daemon 140618300704512)>,
 <Heartbeat(Thread-3, started daemon 140618292311808)>,
 <HistorySavingThread(IPythonHistorySavingThread, started 140618266871552)>,
 <ParentPollerUnix(Thread-1, started daemon 140618258216704)>]

In [16]:
t = threading.Thread(target=time.sleep, args=(5, ))

In [17]:
t.start()

In [18]:
t.join()  # blocked for 5 seconds

Calling thread (main thread) is blocked until `t` is completed

In [19]:
t.join()  # executed the same moment

In [20]:
t = threading.Thread(target=time.sleep, args=(5, ))

In [21]:
t.start()

In [22]:
t.is_alive()

True

In [23]:
t.is_alive()  # after 5 seconds

True

In [24]:
threading.enumerate()

[<_MainThread(MainThread, started 140618361730880)>,
 <Thread(Thread-2, started daemon 140618300704512)>,
 <Heartbeat(Thread-3, started daemon 140618292311808)>,
 <HistorySavingThread(IPythonHistorySavingThread, started 140618266871552)>,
 <ParentPollerUnix(Thread-1, started daemon 140618258216704)>,
 <Thread(Thread-9, started 140618249824000)>]

In [25]:
t = threading.Thread(target=time.sleep, args=(5, ), daemon=True)

In [26]:
t.start()

In [27]:
t.daemon

True

In [28]:
threading.enumerate()

[<_MainThread(MainThread, started 140618361730880)>,
 <Thread(Thread-2, started daemon 140618300704512)>,
 <Heartbeat(Thread-3, started daemon 140618292311808)>,
 <HistorySavingThread(IPythonHistorySavingThread, started 140618266871552)>,
 <ParentPollerUnix(Thread-1, started daemon 140618258216704)>,
 <Thread(Thread-10, started daemon 140617766139648)>]

Flags are used to finalize threads:

In [29]:
class Task:
    
    def __init__(self):
        self._running = True
        
    def terminate(self):
        self._running = False
        
    def run(self, n):
        while self._running:
            pass

In [30]:
from threading import Lock


class SharedCounter:
    
    def __init__(self, value):
        self._value = value
        self._lock = Lock()
        
    def increment(self, delta=1):
        self._lock.acquire()
        self._value += delta
        self._lock.release()
        
    def get(self):
        return self._value

In [31]:
done = Lock()
def idle_release():
    print("Running!")
    time.sleep(15)
    done.release()

In [32]:
done.acquire()

True

In [33]:
threading.Thread(target=idle_release).start()

Running!


In [34]:
done.acquire() and print("WAT?")

WAT?


In [35]:
from collections import deque
from threading import Condition

In [36]:
q = deque()
is_empty = Condition()

In [37]:
def producer():
    while True:
        is_empty.acquire()
        q.append(...)
        is_empty.notify()
        is_empty.release()

In [38]:
def consumer():
    while True:
        is_empty.acquire()
        while not q:
            is_empty.wait()
        _ = q.popleft()
        is_empty.release()

Function follow reads messages from connection in args and puts them in queue for processing:

In [39]:
def follow(connection, connection_lock, q):
    try:
        while True:
            connection_lock.acquire()
            message = connection.read_message()
            connection_lock.release()
            q.put(message)
    except InvalidMessage:
        follow(connection, connection_lock, q)

In [40]:
follower = threading.Thread(target=follow, args=(25, 25, 25))
follower.start()

Exception in thread Thread-12:
Traceback (most recent call last):
  File "<ipython-input-39-787a34223687>", line 4, in follow
    connection_lock.acquire()
AttributeError: 'int' object has no attribute 'acquire'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/leo/Conda/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/home/leo/Conda/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-39-787a34223687>", line 8, in follow
    except InvalidMessage:
NameError: name 'InvalidMessage' is not defined



This function is indalid. Better to use context managers:

In [41]:
def follow(connect, connection_lock, q):
    try:
        while True:
            with connection_lock:
                message = connection.read_message()
            q.put(message)
    except IOError:
        follow(connection, connection_lock, q)

Example of working with queue:

In [42]:
def worker(q):
    while True:
        item = q.get()  # awaits next element blockingly
        do_something(item)  
        q.task_done()  # notifies queue of finishing the task
        
def master(q):
    for item in source():
        q.put(item)
    
    # blockingly awaits until all elements are processed
    q.join()

In [43]:
from concurrent.futures import ThreadPoolExecutor

In [44]:
executor = ThreadPoolExecutor(max_workers=4)

In [45]:
executor.submit(print, "Hello, world!")

Hello, world!


<Future at 0x7fe43c4b7750 state=finished returned NoneType>

In [46]:
list(executor.map(print, ["Knock?", "Knock!"]))

Knock?
Knock!


[None, None]

In [47]:
executor.shutdown()

In [48]:
with ThreadPoolExecutor(max_workers=4) as executor:
    f = executor.submit(sorted, [4, 3, 2, 1])

In [49]:
f.running(), f.done(), f.cancelled()

(False, True, False)

In [50]:
print(f.result())

[1, 2, 3, 4]


In [51]:
print(f.exception())

None


In [52]:
import math

In [53]:
def integrate(f, a, b, *, n_iter=1000):
    acc = 0
    step = (b - a) / n_iter
    for i in range(n_iter):
        acc += f(a + i * step) * step
    return acc

In [54]:
integrate(math.cos, 0, math.pi / 2)

1.0007851925466296

In [55]:
integrate(math.sin, 0, math.pi)

1.9999983550656637

In [56]:
from functools import partial

In [57]:
def integrate_async(f, a, b, *, n_jobs, n_iter=1000):
    with ThreadPoolExecutor(max_workers=n_jobs) as executor:
        spawn = partial(executor.submit, integrate, f, n_iter=n_iter)
        step = (b - a) / n_jobs
        fs = [spawn(a + i * step, a + (i + 1) * step) for i in range(n_jobs)]
        return sum(f.result() for f in fs)

In [58]:
integrate_async(math.cos, 0, math.pi / 2, n_jobs=16)

1.0000490865820222

In [59]:
integrate_async(math.sin, 0, math.pi, n_jobs=2)

1.9999995887664657

In [60]:
%%timeit
integrate(math.cos, 0, math.pi / 2, n_iter=10**6)

110 ms ± 3.18 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [61]:
%%timeit
integrate_async(math.cos, 0, math.pi / 2, n_iter=10**6, n_jobs=16)

3.57 s ± 66.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


`as_completed` needed for `fs` in `integrate_async`

In [62]:
from urllib.request import urlretrieve

In [64]:
with ThreadPoolExecutor(max_workers=4) as executor:
    with open("urls.txt", "r") as handle:
        for url in handle:
            executor.submit(urlretrieve, url)

In [65]:
import asyncio

In [66]:
async def echo(source, target):
    while True:
        line = await source.readline()
        if not line:
            break
        target.write(line)

In [67]:
loop = asyncio.get_event_loop()
server = asyncio.start_server(echo, port=8080)
loop.create_task(server)

<Task pending coro=<start_server() running at /home/leo/Conda/lib/python3.7/asyncio/streams.py:82>>

In [68]:
try:
    loop.run_forever()
finally:
    server.close()
    loop.close()

RuntimeError: Cannot close a running event loop

In [69]:
%load_ext cython

In [71]:
%%cython
from libc.math cimport cos
def integrate(f, double a, double b, long n_iter):
    cdef double acc = 0
    cdef double step = (b - a) / n_iter
    cdef long i
    with nogil:
        for i in range(n_iter):
            acc += cos(a + i * step) * step
    return acc

In [72]:
%%timeit 
integrate_async(math.cos, 0, math.pi / 2, n_iter=10**6, n_jobs=16)

19 ms ± 165 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


In [73]:
import multiprocessing as mp

In [74]:
p = mp.Process(target=countdown, args=(5, ))

In [75]:
p.start()

4 left
3 left
2 left
1 left
0 left


In [76]:
p.name, p.pid

('Process-1', 72103)

In [77]:
p.daemon

False

In [78]:
p.join()

In [79]:
p.exitcode

0

In [83]:
def ponger(conn):
    conn.send("pong")

In [84]:
parent_conn, child_conn = mp.Pipe()

In [85]:
p = mp.Process(target=ponger, args=(child_conn))

TypeError: 'Connection' object is not iterable

In [86]:
p.start()

AssertionError: cannot start a process twice

In [87]:
from concurrent.futures import ProcessPoolExecutor

In [88]:
def integrate_async(f, a, b, *, n_jobs, n_iter=1000):
    executor = ProcessPoolExecutor(max_workers=n_jobs)
    spawn = partial(executor.submit, integrate, f, n_iter=n_iter)
    step = (b - a) / n_jobs
    fs = [spawn(a + i * step, a + (i + 1) * step) for i in range(n_jobs)]
    return sum(f.result() for f in as_completed(fs))

In [89]:
from joblib import Parallel, delayed

In [92]:
def integrate_async(f, a, b, *, n_jobs, n_iter=1000, backend=None):
    step = (b - a) / n_jobs
    with Parallel(n_jobs=n_jobs, backend=backend) as parallel:
        fs = (delayed(integrate)(f, a + i * step, a + (i + 1) * step, n_iter=n_iter) for i in range(n_jobs))
        return sum(parallel(fs))

In [94]:
%%timeit
integrate_async(math.cos, 0, math.pi / 2, n_iter=10**6, n_jobs=16, backend="threading")

103 ms ± 135 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [95]:
%%timeit
integrate_async(math.cos, 0, math.pi / 2, n_iter=10**6, n_jobs=16, backend="multiprocessing")

219 ms ± 3.51 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
