borrowd from https://github.com/cyk1337/PyTips

# Queue

In [2]:
from queue import Queue # queue, FIFO
from queue import LifoQueue # stack, LIFO
from queue import PriorityQueue # priority queue (min heap)

q = Queue() # FIFO, size not specified
print(q.empty())


# queue: .put(), .get()
q.put("t1")
q.put("t2")
q.put("t3")

print(q.empty())
print(q.full())


True
False
False


In [3]:
q.queue


deque(['t1', 't2', 't3'])

In [4]:
q.get() # LIFO


't1'

In [5]:
q1 = Queue(3) # size specified
for i in range(3):
    q1.put(i)
q1.full()

True

In [6]:
q1.put(4, block=False) # q1 is full, block


Full: 

# Priority Queue


In [18]:

pq = PriorityQueue() # the smaller the value, the higher the priority
pq.put((1,'c'))
pq.put((2,'b'))
pq.put((3,'a'))

In [9]:
pq

<queue.PriorityQueue at 0x1099796a0>

In [10]:
help(pq)

Help on PriorityQueue in module queue object:

class PriorityQueue(Queue)
 |  PriorityQueue(maxsize=0)
 |  
 |  Variant of Queue that retrieves open entries in priority order (lowest first).
 |  
 |  Entries are typically tuples of the form:  (priority number, data).
 |  
 |  Method resolution order:
 |      PriorityQueue
 |      Queue
 |      builtins.object
 |  
 |  Methods inherited from Queue:
 |  
 |  __init__(self, maxsize=0)
 |      Initialize self.  See help(type(self)) for accurate signature.
 |  
 |  empty(self)
 |      Return True if the queue is empty, False otherwise (not reliable!).
 |      
 |      This method is likely to be removed at some point.  Use qsize() == 0
 |      as a direct substitute, but be aware that either approach risks a race
 |      condition where a queue can grow before the result of empty() or
 |      qsize() can be used.
 |      
 |      To create code that needs to wait for all queued tasks to be
 |      completed, the preferred technique is to us

In [19]:
pq.queue

[(1, 'c'), (2, 'b'), (3, 'a')]

In [20]:
pq.get()

(1, 'c')

# Multiprocessing

In [24]:
import os

print('Process (%s) start...' % os.getpid())
# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
    print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
    print('I (%s) just created a child process (%s).' % (os.getpid(), pid))

Process (23007) start...
I (23007) just created a child process (23079).
I am child process (23079) and my parent is 23007.


# importing the multiprocessing module 
import multiprocessing 
import os 
  
def worker1(): 
    # printing process id 
    print("ID of process running worker1: {}".format(os.getpid())) 
  
def worker2(): 
    # printing process id 
    print("ID of process running worker2: {}".format(os.getpid())) 
  
if __name__ == "__main__": 
    # printing main program process id 
    print("ID of main process: {}".format(os.getpid())) 
  
    # creating processes 
    p1 = multiprocessing.Process(target=worker1) 
    p2 = multiprocessing.Process(target=worker2) 
  
    # starting processes 
    p1.start() 
    p2.start() 
  
    # process IDs 
    print("ID of process p1: {}".format(p1.pid)) 
    print("ID of process p2: {}".format(p2.pid)) 
    
        # check if processes are alive 
    print("Process p1 is alive: {}".format(p1.is_alive())) 
    print("Process p2 is alive: {}".format(p2.is_alive()))
  
    # wait until processes are finished 
    p1.join() 
    p2.join() 
  
    # both processes finished 
    print("Both processes finished execution!") 
  
    # check if processes are alive 
    print("Process p1 is alive: {}".format(p1.is_alive())) 
    print("Process p2 is alive: {}".format(p2.is_alive()))

In multiprocessing, workers

run independently
have their own memory space

In [26]:
import multiprocessing

result = []

def square_list(mylist): 
    """ 
    function to square a given list 
    """
    global result 
    # append squares of mylist to global list result 
    for num in mylist: 
        result.append(num * num) 
    # print global list result 
    print("Result(in process p1): {}".format(result))

l=[1,2,3,4]
p1 = multiprocessing.Process(target=square_list, args=(l,))
p1.start()
p1.join()

# print global result list 
print("Result(in main program): {}".format(result))  # cannot directly access the data in subprocess

Result(in process p1): [1, 4, 9, 16]
Result(in main program): []


# Solution 1: share memory

- Array: a ctypes array allocated from shared memory
- Value: a ctypes object allocated from shared memory

In [27]:
import multiprocessing

def square_list(mylist, result, square_sum): 
    """ 
    function to square a given list 
    """
    # append squares of mylist to result array 
    for idx, num in enumerate(mylist): 
        result[idx] = num * num 
    
    # square sum value
    square_sum.value = sum(result)
    print("Result(in process p1): {}".format(result[:])) 
    # print square_sum Value 
    print("Sum of squares(in process p1): {}".format(square_sum.value))
    
l = [1,2,3,4]

result_array = multiprocessing.Array('i', 4) # create array of int with 4 integers
square_sum_value = multiprocessing.Value('i')

p1 = multiprocessing.Process(target=square_list, args=(l, result_array, square_sum_value))

p1.start()
p1.join()

# print result array 
print("Result(in main program): {}".format(result_array[:])) 
  
# print square_sum Value 
print("Sum of squares(in main program): {}".format(square_sum_value.value))

Result(in process p1): [1, 4, 9, 16]
Sum of squares(in process p1): 30
Result(in main program): [1, 4, 9, 16]
Sum of squares(in main program): 30


# Solution 2: server process
    Use multiprocessing.Manager
    - support arbitrary object types like lists, dictionaries, Queue, Value, Array, etc.
    - slower than shared memory

In [28]:
import multiprocessing 
  
def print_records(records): 
    """ 
    function to print record(tuples) in records(list) 
    """
    for record in records: 
        print("Name: {0}\nScore: {1}\n".format(record[0], record[1]))

def insert_record(record, records): 
    """ 
    function to add a new record to records(list) 
    """
    records.append(record) 
    print("New record added!\n")

with multiprocessing.Manager() as manager:
    # creating a list in server process memory 
    records = manager.list([('a',10), ('b',9), ('k',8)])
    new_record = ("J", 7)
    p1 = multiprocessing.Process(target=insert_record, args=(new_record, records))
    p2 = multiprocessing.Process(target=print_records, args=(records,))
    
    p1.start()
    p1.join()
    
    p2.start()
    p2.join()

New record added!

Name: a
Score: 10

Name: b
Score: 9

Name: k
Score: 8

Name: J
Score: 7



multiprocessing supports two kinds of communication channel between processes

- Queue
- Pipe


# 1. Queue
    - A simple way to communicate between processes with multiprocessing is to use a Queue to pass mesasges.
    - Any python oject can pass through a queue
    - multiprocessing.Queue class is a near clone of queue.Queue

In [29]:
  
def square_list(mylist, q): 
    """ 
    function to square a given list 
    """
    # append squares of mylist to queue 
    for num in mylist: 
        q.put(num * num) 

def print_queue(q): 
    """ 
    function to print queue elements 
    """
    print("Queue elements:") 
    while not q.empty(): 
        print(q.get()) 
    print("Queue is now empty!")

l = [1,2,3,4]
q = multiprocessing.Queue()

p1 = multiprocessing.Process(target=square_list, args=(l, q))
p2 = multiprocessing.Process(target=print_queue, args=(q,))

for p in [p1,p2]:
    p.start()
    p.join()

Queue elements:
1
4
9
16
Queue is now empty!



# 2. Pipes

    - it is preferred over queue when only two-way communication is required.
    - The two connection objects returned by Pipe() represent the two ends of the pipe.
    - Each connection object has send() and recv() methods (among others).

In [30]:
import multiprocessing

import multiprocessing 
  
def sender(conn, msgs): 
    """ 
    function to send messages to other end of pipe 
    """
    for msg in msgs: 
        conn.send(msg) 
        print("Sent the message: {}".format(msg)) 
    conn.close() 

def receiver(conn): 
    """ 
    function to print the messages received from other 
    end of pipe 
    """
    while 1: 
        msg = conn.recv() 
        if msg == "END": 
            break
        print("Received the message: {}".format(msg)) 
        
# messages to be sent 
msgs = ["hello", "hey", "hru?", "END"] 
# creating a pipe 
parent_conn, child_conn = multiprocessing.Pipe()

# creating new processes
p1 = multiprocessing.Process(target=sender, args=(parent_conn, msgs))
p2 = multiprocessing.Process(target=receiver, args=(child_conn,))

for p in [p1,p2]:
    p.start()
    p.join()

Sent the message: hello
Sent the message: hey
Sent the message: hru?
Sent the message: END
Received the message: hello
Received the message: hey
Received the message: hru?


# Synchronization between processes
    - Process synchronization is defined as a mechanism which ensures that two or more concurrent processes do not simultaneously execute some particular program segment known as critical section.
    - Critical section refers to the parts of the program where the shared resource is accessed.
    - Concurrent accesses to shared resource can lead to race condition.

In [32]:
# race condition
import multiprocessing 
  
# function to withdraw from account 
def withdraw(balance):     
    for _ in range(10000): 
        balance.value = balance.value - 1

# function to deposit to account 
def deposit(balance):     
    for _ in range(10000): 
        balance.value = balance.value + 1

        
def perform_transactions():   
    # initial balance (in shared memory) 
    balance = multiprocessing.Value('i', 100) 
  
    # creating new processes 
    p1 = multiprocessing.Process(target=withdraw, args=(balance,)) 
    p2 = multiprocessing.Process(target=deposit, args=(balance,)) 
  
    # starting processes 
    p1.start() 
    p2.start() 
  
    # wait until processes are finished 
    p1.join() 
    p2.join() 
  
    # print final balance 
    print("Final balance = {}".format(balance.value)) 

# race condition
for _ in range(10): 
    # perform same transaction process 10 times 
    perform_transactions()

Final balance = 1082
Final balance = 3392
Final balance = -503
Final balance = -1373
Final balance = 4853
Final balance = 3189
Final balance = -1631
Final balance = -3064
Final balance = -459
Final balance = -544


- multiprocessing module provides a Lock class to deal with the race conditions. Lock is implemented using a Semaphore object provided by the Operating System.
- As soon as a lock is acquired, no other process can access its critical section until the lock is released using lock.release() method.

In [36]:
# deal with race condition, use LOCK()
import multiprocessing 
  
# function to withdraw from account 
def withdraw(balance, lock):     
    for _ in range(10000):
        lock.acquire()
        balance.value = balance.value - 1
        lock.release()
        
# function to deposit to account 
def deposit(balance, lock):     
    for _ in range(10000):
        lock.acquire()
        balance.value = balance.value + 1
        lock.release()
        
def perform_transactions():   
    # initial balance (in shared memory) 
    balance = multiprocessing.Value('i', 100) 
    
    lock = multiprocessing.Lock() # creating a lock object 
    
    # creating new processes 
    p1 = multiprocessing.Process(target=withdraw, args=(balance,lock)) 
    p2 = multiprocessing.Process(target=deposit, args=(balance,lock)) 
  
    # starting processes 
    p1.start() 
    p2.start() 
  
    # wait until processes are finished 
    p1.join() 
    p2.join() 
  
    # print final balance 
    print("Final balance = {}".format(balance.value)) 

# race condition
for _ in range(10): 
    # perform same transaction process 10 times 
    perform_transactions()

Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100


In [37]:
# Pooling between processes


In [35]:
# Pooling between processes
# Python program to understand  
# the concept of pool 
import multiprocessing 
import os 
  
def square(n): 
    print("Worker process id for {0}: {1}".format(n, os.getpid())) 
    return (n*n)

l = [1,2,3,4]
p = multiprocessing.Pool()

result = p.map(square, l)

print(result)


Worker process id for 3: 23816
Worker process id for 4: 23817
Worker process id for 2: 23815
Worker process id for 1: 23814
[1, 4, 9, 16]
