# 17. Concurrent Execution

In [None]:
'''
Concurrent execution of code:
    The appropriate choice of tool will depend on the task to be executed 
        CPU bound
        IO bound
        even driven cooperative multitasking 
        preemptive multitasking
'''

# 17.1 threading --- Thread-based parallelism

In [24]:
'''
    threading.active_count() : Return the number of Thread object currently alive 
    threading.current_thread(): Return the current Thread object
    threading.get_ident(): Return the thread identifier of the current thread
    threading.main_thread(): the mainb thread is the thread from which the Python interpreter was started
    threading.settrace(): set a trace function for all threas from the threading module.
    threading.setprofile(): before its run() method is called 
    threading.stack_size(): return the thread stack size used when creating new threads
    threading.TIMEOUT_MAX
'''
import threading
from threading import Thread 

def info():
    print('Hello everybody')


#threading.settrace(info)

if __name__ == '__main__':
    print(threading.active_count())
    # Return a list of all Thread objects currently alive
    for index, i in  enumerate(threading.enumerate()):
        print(i)
    print(threading.current_thread())
    print('Return the "thread identifier : {}" of the current thread'.format(threading.get_ident()))
    print('the main Thread Object:{}'.format(threading.main_thread()))
    print(threading.stack_size())
    print(threading.TIMEOUT_MAX)
    
    # Thread-Local Data
    mydata = threading.local()
    mydata.x = 1
    print(mydata.x)
    '''
    Main class for threads:
        The Thread class represents an activity that is run in a separate thread of control.
            1. callable object [constructor]
            2. overriding the __init__() , run() method
        Once a thread object is created, its activity must be started by calling the thread's start() method.
        is_alive() methods tests whether the thread is alive.
        the main thread is not a darmon thread and therefore all threads created in the main thread default to daemon = False
        In CPython, due to the Global Interpreter Lock.only one thread can execute Python code at once.
        
        If you want your application to make better use of the computational resources of multi-core machines, you are advised to use multiprocessing or concurrent.futures.ProcessPoolExecutor.
        
        However, threading is still an appropriate model if you want to run multiple I/O-bound tasks simultaneously
    
    A primitive lock is in one of two states:
        locked
        unlocked
    two basic methods:
        acquire()
        release() 
    Locks also support the context management protocol
        
    release a lock, This can be called from any thread, not only the thread which has acquired the lock.
    
    1. Lock Objects:
        acquire()
        release()
    2. RLock Objects: 重入锁
    
    Condition Objects:
    
    # For example, the following code is a generic producer-consumer situation with unlimited buffer capacity
    # Consum one item
    with cv:
        while not an_item_is_available():
            cv.wait_for(an_item_is_available)# return after an arbitrary long time.
        get_an_available_item()
    
    # Produce one item
    with cv:
        make_an_item_available()
        cv.notify()
    
    
    Semaphore Objects: 信号对象
        decremented by each acquire() call and incremented by each release() call. The counter can never go below zero.
        Semaphores are often used to guard resource with limited capacity. 
    
    Event Object: 时间对象
        An event object manages an internal flag that can be set to true with the set() method and reset with the clear() method.the wait() method blocks until flag is true.
    
    
    Timer Objects: 计时器对象
        Timers are started, as with threads, by calling their start() method. The timer can be stopped(before its action has begun) by calling the cancel() method.
     
    Barrier Objects: 
        This class provides a simple synchronization primitive for use by a fixed number of threads that need to wait for each other.
        Here is a simple way to synchronize a client and server thread.
        
     '''
    #t = threading.Timer(2.0, hello)
    #t.start() # after 2 seconds
    
    b = threading.Barrier(2, timeout=5)
    
    def server():
        start_server()
        b.wait()
        while True:
            connection = accept_connection()
            process_server_connection(connection)
    
    def client():
        b.wait()
        while True:
            connection = make_connection()
            process_client_connection(connection)
    

5
<_MainThread(MainThread, started 140653582182144)>
<Thread(Thread-2, started daemon 140653444228864)>
<Heartbeat(Thread-3, started daemon 140653435836160)>
<ParentPollerUnix(Thread-1, started daemon 140653065402112)>
<HistorySavingThread(IPythonHistorySavingThread, started 140653410395904)>
<_MainThread(MainThread, started 140653582182144)>
Return the "thread identifier : 140653582182144" of the current thread
the main Thread Object:<_MainThread(MainThread, started 140653582182144)>
0
9223372036.0
1


Exception in thread Thread-8:
Traceback (most recent call last):
  File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.5/threading.py", line 1177, in run
    def run(self):
TypeError: info() takes 0 positional arguments but 3 were given



# 17.2. Multiprocessing -- Process-based parallelism

In [9]:
'''
The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads.
'''
from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    # basic example of data parallelism using Pool
    with Pool(4) as p:
        print(p.map(f, [1,2,3]))

[1, 4, 9]


In [10]:
from multiprocessing import Process
import os 

# show the individual process IDs involved
def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())
    
def f(name):
    info('function f')
    print('hello', name)

if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

main line
module name: __main__
parent process: 69234
process id: 69609
function f
module name: __main__
parent process: 69609
process id: 71853
hello bob


In [None]:
# Contexts and start methods 
'''
Depending on the platform, multiprocessing supports three ways to start a process.
    spawn: [default on Windows]
        -- The parent process starts a fresh python interpreter process; Starting a process using this method is rather slow compared to using fork or forkserver
    fork: [default on Unix]
        -- The parent process uses os.fork() to fork the Python interpreter.All resources of the parent are inherited by the child process.
    forkserver: [available on Unix platform which support passing file descriptors]
        -- The fork server process is single threaded so it is safe for it to use os.fork() 
'''
import multiprocessing as mp 

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    ctx = mp.get_context('fork')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()


In [None]:
'''
# Exchanging objects between processes
#     Queues:
#     Pipes:
        
'''
from multiprocessing import Process, Queue, Pipe
# Queues are thread and process safe
# Pipe 

def f(q):
    q.put([42, None, 'hello'])

def f2(conn):
    conn.send([42, None, 'hello'])
    conn.close()
    
if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())
    p.join()
    
    parent_conn, child_conn = Pipe() # Each connection object has send() and recv() methods.
    p = Process(target=f2, args=(child_conn,))
    p.start()
    print(parent_conn.recv()) 
    p.join()
'''
Note that data in a pipe may become corrupted if two processes(or threads) try to read from or write to the same end of the pipe at the same time. Of course there is no risk of corruption from processes using different ends of the pipe at the same time.
'''
    

In [None]:
# Synchronization between processes
from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()

if __name__ == '__main__':
    lock = Lock()
    
    for num in range(10):
        Process(target=f, args=(lock, num)).start()

In [None]:
'''
# Sharing state between processes
    Data can be stored in a shared memory map using Value or Array.
'''
from multiprocessing import Process, Value, Array

def f(n,a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    # These shared objects will be process and thread-safe
    num = Value('d', 0.0)
    arr = Array('i', range(10))
    
    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()
    
    print(num.value)
    print(arr[:])

In [None]:
# Server process
from multiprocessing import Process, Manager
# Manager slower than using shared memory

def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()

if __name__ == '__main__':
    # A manager returned by Manager() will support types list, dict, Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Barrier,Queue,Value,Array
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(10))
        
        p = Process(target=f, args=(d, 1))
        p.start()
        p.join()
        
        print(d)
        print(l)

In [None]:
from multiprocessing import Pool, TimeoutError
import time
import os

def f(x):
    return x*x

if __name__ == '__main__':
    # start 4 worker processes
    with Pool(processes=4) as pool:

        # print "[0, 1, 4,..., 81]"
        print(pool.map(f, range(10)))

        # print same numbers in arbitrary order
        for i in pool.imap_unordered(f, range(10)):
            print(i)

        # evaluate "f(20)" asynchronously
        res = pool.apply_async(f, (20,))      # runs in *only* one process
        print(res.get(timeout=1))             # prints "400"

        # evaluate "os.getpid()" asynchronously
        res = pool.apply_async(os.getpid, ()) # runs in *only* one process
        print(res.get(timeout=1))             # prints the PID of that process

        # launching multiple evaluations asynchronously *may* use more processes
        multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
        print([res.get(timeout=1) for res in multiple_results])

        # make a single worker sleep for 10 secs
        res = pool.apply_async(time.sleep, (10,))
        try:
            print(res.get(timeout=1))
        except TimeoutError:
            print("We lacked patience and got a multiprocessing.TimeoutError")

        print("For the moment, the pool remains available for more work")

    # exiting the 'with'-block has stopped the pool
    print("Now the pool is closed and no longer available")

In [None]:
import multiprocessing, time, signal

if __name__ == '__main__':
    p = multiprocessing.Process(target=time.sleep, args=(1000,))
    print(p, p.is_alive())
    p.start()
    print(p, p.is_alive())
    p.terminate()
    time.sleep(0.1)
    print(p, p.is_alive())
    print(p.exitcode == -signal.SIGTERM)