In [22]:
import time
import os
from multiprocessing import Process, Pool, Pipe, Queue, cpu_count
import concurrent.futures
import threading

In [2]:
print(f'Number of CPU cores: {cpu_count()}')

Number of CPU cores: 16


# Multi-Processing

In [3]:
def long_time_task():
    print(f'Current child process: {os.getpid()}')
    time.sleep(2)
    print(f'Result: {8 ** 20}')

In [4]:
print(f'Current mother process: {os.getpid()}')
start = time.perf_counter()
for i in range(2):
    long_time_task()
end = time.perf_counter()
print(f'Time consumed: {end - start} seconds')

Current mother process: 30799
Current child process: 30799
Result: 1152921504606846976
Current child process: 30799
Result: 1152921504606846976
Time consumed: 4.0084696829999995 seconds


In [34]:
def long_time_task(i):
    print(f'Child process: {os.getpid()} - Task: {i}')
    time.sleep(i)
    print(f'Result: {i}')

In [35]:
print(f'Current mother process: {os.getpid()}')
start = time.perf_counter()
p1 = Process(target=long_time_task, args=(3, ))
p2 = Process(target=long_time_task, args=(1, ))
print('Wait for all child processes to be finished...')
p1.start()
p2.start()
p1.join()
p2.join()
end = time.perf_counter()
print(f'Time consumed: {end - start} seconds')

Current mother process: 30799
Wait for all child processes to be finished...
Child process: 43682 - Task: 3
Child process: 43683 - Task: 1
Result: 1
Result: 3
Time consumed: 3.0165687019998586 seconds


In [7]:
# Use Pool
print(f'Current mother process: {os.getpid()}')
start = time.perf_counter()
p = Pool(4)
for i in range(5):
    p.apply_async(long_time_task, args=(i, ))
print("Wait for all child processes to be finished...")
p.close()
p.join()
end = time.perf_counter()
print(f'Time consumed: {end - start} seconds')

Current mother process: 30799
Child process: 30811 - Task: 0
Child process: 30812 - Task: 1
Child process: 30813 - Task: 2
Child process: 30814 - Task: 3
Wait for all child processes to be finished...
Result: 1152921504606846976
Result: 1152921504606846976
Result: 1152921504606846976
Result: 1152921504606846976
Child process: 30812 - Task: 4
Result: 1152921504606846976
Time consumed: 4.041476960999999 seconds


In [8]:
# Data transfer between processes using Pipe
def receiver(x, pipe):
    _out_pipe, _in_pipe = pipe
    # Close in, only use out
    _in_pipe.close()
    while True:
        try:
            msg = _out_pipe.recv()
            print(msg + x)
        except EOFError:
            print('No input')
            break

In [9]:
out_pipe, in_pipe = Pipe()
child_p = Process(target=receiver, args=(100, (out_pipe, in_pipe)))
child_p.start()
out_pipe.close()
for i in range(10):
    in_pipe.send(i)
in_pipe.close()
child_p.join()
print('Main process ends')

100
101
102
103
104
105
106
107
108
109
No input
Main process ends


In [10]:
# Data transfer between processes using Queue
def write(q):
    print(f'Process to write: {os.getpid()}')
    for value in ['A', 'B', 'C']:
        print(f'Put {value} into queue...')
        q.put(value)
        time.sleep(2)

def read(q):
    print(f'Process to read: {os.getpid()}')
    while True:
        value = q.get()
        print(f'Get {value} from queue')

In [11]:
q = Queue()
pw = Process(target=write, args=(q, ))
pr = Process(target=read, args=(q, ))
pw.start()
pr.start()
pw.join()
pr.terminate()
print('Write and read processes finished...')

Process to write: 30818
Put A into queue...
Process to read: 30819
Get A from queue
Put B into queue...
Get B from queue
Put C into queue...
Get C from queue
Write and read processes finished...


In [12]:
def func(msg):
    print(f'msg: {msg}')
    time.sleep(1)
    print("*" * 10)
    return f'func_returns: {msg}'

In [13]:
pool = Pool(4)
results = []
for i in range(10):
    msg = f'Hello World! {i}'
    result = pool.apply_async(func, (msg, ))
    results.append(result)
pool.close()
pool.join()
print('All finished')

msg: Hello World! 0
msg: Hello World! 2
msg: Hello World! 1
msg: Hello World! 3
**********
**********
**********
**********
msg: Hello World! 4
msg: Hello World! 5
msg: Hello World! 6
msg: Hello World! 7
**********
**********
**********
**********
msg: Hello World! 8
msg: Hello World! 9
**********
**********
All finished


In [55]:
def do_something(seconds):
    print(f'Sleeping {seconds} seconds...')
    time.sleep(seconds)
    return f'Done sleeping {seconds} seconds'

In [20]:
with concurrent.futures.ProcessPoolExecutor() as executor:
    f1 = executor.submit(do_something, 3)
    f2 = executor.submit(do_something, 1)
    print(f1.result())
    print(f2.result())

Sleeping 3 seconds...
Sleeping 1 seconds...
Done sleeping 3 seconds
Done sleeping 1 seconds


In [19]:
with concurrent.futures.ProcessPoolExecutor() as executor:
    results = [executor.submit(do_something, i) for i in [5, 4, 3, 2, 1]]
    
    for f in concurrent.futures.as_completed(results): # in the sequence of finishing time
        print(f.result())

Sleeping 5 seconds...
Sleeping 3 seconds...
Sleeping 4 seconds...
Sleeping 1 seconds...
Sleeping 2 seconds...
Done sleeping 1 seconds
Done sleeping 2 seconds
Done sleeping 3 seconds
Done sleeping 4 seconds
Done sleeping 5 seconds


In [21]:
with concurrent.futures.ProcessPoolExecutor() as executor:
    secs = [5, 4, 3, 2, 1]
    results = executor.map(do_something, secs) # directly return results
    
    for result in results:
        print(result)

Sleeping 5 seconds...
Sleeping 3 seconds...
Sleeping 4 seconds...
Sleeping 2 seconds...
Sleeping 1 seconds...
Done sleeping 5 seconds
Done sleeping 4 seconds
Done sleeping 3 seconds
Done sleeping 2 seconds
Done sleeping 1 seconds


# Multi-Threading

In [53]:
def long_time_task(i):
    print(f'Current child thread: {threading.current_thread().name} - task: {i}')
    time.sleep(i)
    print(f'Result: {i}')

In [33]:
start = time.perf_counter()
print(f'This is the main thread: {threading.current_thread().name}')
t1 = threading.Thread(target=long_time_task, args=(3, ))
t2 = threading.Thread(target=long_time_task, args=(1, ))
t1.start()
t2.start()
t1.join()
t2.join()
end = time.perf_counter()
print(f'Total time consumed: {end - start} seconds')

This is the main thread: MainThread
Current child thread: Thread-20 - task: 3Current child thread: Thread-21 - task: 1

Result: 1
Result: 3
Total time consumed: 3.0045041060002404 seconds


In [54]:
start = time.perf_counter()
print(f'This is the main thread: {threading.current_thread().name}')
thread_list = []
for i in range(1, 3):
    t = threading.Thread(target=long_time_task, args=(i, ))
    t.start()
    thread_list.append(t)

for t in thread_list:
    t.join()

end = time.perf_counter()
print(f'Total time consumed: {end - start} seconds')

This is the main thread: MainThread
Current child thread: Thread-28 - task: 1
Current child thread: Thread-29 - task: 2
Result: 1
Result: 2
Total time consumed: 2.0047859309997875 seconds


In [38]:
# setDaemon, means that thread is not important, don't need to wait
start = time.perf_counter()
print(f'This is the main thread: {threading.current_thread().name}')
for i in range(1, 5):
    t = threading.Thread(target=long_time_task, args=(i, ))
    t.setDaemon(True)
    t.start()

end = time.perf_counter()
print(f'Total time consumed: {end - start} seconds')

This is the main thread: MainThread
Current child thread: Thread-24 - task: 1Current child thread: Thread-25 - task: 2

Current child thread: Thread-26 - task: 3
Current child thread: Thread-27 - task: 4
Total time consumed: 0.002207265999459196 seconds
Result: 1
Result: 2
Result: 3
Result: 4


In [41]:
# Lock
class Account:
    def __init__(self):
        self.balance = 0
    
    def save(self, lock):
        lock.acquire()
        for _ in range(10000):
            self.balance += 1
        lock.release()
    
    def withdraw(self, lock):
        lock.acquire()
        for _ in range(10000):
            self.balance -= 1
        lock.release()

In [45]:
account = Account()
lock = threading.Lock()
t_save = threading.Thread(target=account.save, args=(lock, ), name='Save')
t_withdraw = threading.Thread(target=account.withdraw, args=(lock, ), name='Withdraw')
t_save.start()
t_withdraw.start()
t_save.join()
t_withdraw.join()
print(f'Current balance: {account.balance}')

Current balance: 0


In [46]:
# Queue, thread safe
from queue import Queue

In [48]:
class Producer(threading.Thread):
    def __init__(self, name, queue):
        threading.Thread.__init__(self, name=name)
        self.queue = queue
    
    def run(self):
        for i in range(1, 5):
            print(f'{self.getName()} is producing {i} to queue!')
            self.queue.put(i)
            time.sleep(2)
        print(f'{self.getName()} finished!')

In [49]:
class Consumer(threading.Thread):
    def __init__(self, name, queue):
        threading.Thread.__init__(self, name=name)
        self.queue = queue
    
    def run(self):
        for _ in range(1, 5):
            val = self.queue.get()
            print(f'{self.getName()} is consuming {val} in the queue!')
            time.sleep(3)
        print(f'{self.getName()} finished!')

In [50]:
def main():
    queue = Queue()
    producer = Producer('Producer', queue)
    consumer = Consumer('Consumer', queue)
    producer.start()
    consumer.start()
    producer.join()
    consumer.join()
    print('All threads finished')

In [52]:
main()

Producer is producing 1 to queue!
Consumer is consuming 1 in the queue!
Producer is producing 2 to queue!
Consumer is consuming 2 in the queue!
Producer is producing 3 to queue!
Consumer is consuming 3 in the queue!Producer is producing 4 to queue!

Producer finished!
Consumer is consuming 4 in the queue!
Consumer finished!
All threads finished


In [56]:
with concurrent.futures.ThreadPoolExecutor() as executor:
    f1 = executor.submit(do_something, 3)
    f2 = executor.submit(do_something, 1)
    print(f1.result())
    print(f2.result())

Sleeping 3 seconds...
Sleeping 1 seconds...
Done sleeping 3 seconds
Done sleeping 1 seconds
