# Python Parallel

Stuff explanation...

## Testing Serial Implementation

In [4]:
import time
from do_something import *

tic = time.time()
size = 10000000   
n_exec = 10
for i in range(0, n_exec):
    out_list = list()
    do_something(size, out_list)

print('List processing complete.')
toc = time.time()
print('serial time=', toc - tic)

List processing complete.
serial time= 13.68494963645935


## Testing Multithreading Implementation.

In [5]:
from do_something import *
import time
import threading #crea una concurrecnia dentro de un CPU 

tic = time.time()
size = 10000000
threads = 10  
jobs = []

for i in range(0, threads):
    out_list = list()
    thread = threading.Thread(target=do_something(size, out_list))
    jobs.append(thread) #se crea objeto de tipo hilo y se concatenan a una lista 

for j in jobs:
    j.start()


for j in jobs:
    j.join()

print ('List processing complete.')
toc = time.time()
print('multithreading time=', toc - tic)

List processing complete.
multithreading time= 13.773424863815308


## Testing Multiprocessing Implementation

In [6]:
from do_something import *
import time
import multiprocessing

tic = time.time()
size = 10000000   
procs = 10   
jobs = []

for i in range(0, procs):
    out_list = list()
    process = multiprocessing.Process(target=do_something,args=(size,out_list))
    jobs.append(process)

for j in jobs:
    j.start()

for j in jobs:
    j.join()

print ('List processing complete.')
toc = time.time()
print('multiprocesses time=', toc - tic)

List processing complete.
multiprocesses time= 8.072590827941895


# Threads

Python threading module, state, semaphores...

In [7]:
import threading


def my_func(thread_number):
    return print('my_func called by thread N°{}'.format(thread_number))


def main():
    threads = []
    for i in range(10):
        t = threading.Thread(target=my_func, args=(i,))
        threads.append(t)
        t.start()
        t.join()

main()

my_func called by thread N°0
my_func called by thread N°1
my_func called by thread N°2
my_func called by thread N°3
my_func called by thread N°4
my_func called by thread N°5
my_func called by thread N°6
my_func called by thread N°7
my_func called by thread N°8
my_func called by thread N°9


## Get thread name (id)

In [8]:
import threading
import time

def function_A():
    print (threading.currentThread().getName() + str('--> starting \n'))
    time.sleep(2)
    print (threading.currentThread().getName() + str( '--> exiting \n'))
    return

def function_B():
    print (threading.currentThread().getName() + str('--> starting \n'))
    time.sleep(2)
    print (threading.currentThread().getName() + str('--> exiting \n'))
    return

def function_C():
    print (threading.currentThread().getName() + str('--> starting \n'))
    time.sleep(2)
    print (threading.currentThread().getName() + str('--> exiting \n'))
    return

t1 = threading.Thread(name='function_A', target=function_A)
t2 = threading.Thread(name='function_B', target=function_B)
t3 = threading.Thread(name='function_C', target=function_C) 

t1.start()
t2.start()
t3.start()

t1.join()
t2.join()
t3.join()

function_A--> starting 

function_B--> starting 

function_C--> starting 

function_A--> exiting 

function_B--> exiting 

function_C--> exiting 



## Subclassing threads

In [19]:
import time
import os
from random import randint
from threading import Thread


class MyThreadClass(Thread):
    
    def __init__(self, name, duration):
        Thread.__init__(self)
        self.name = name
        self.duration = duration
    
    def run(self):
        print ('---> {} running, belonging to process ID {}\n'.format(self.name, os.getpid()))
        time.sleep(self.duration)
        print ('---> {} over\n'.format(self.name))


def main():
    tic = time.time()
    
    # Thread Creation
    thread1 = MyThreadClass('Thread#1 ', randint(1, 10))
    thread2 = MyThreadClass('Thread#2 ', randint(1, 10))
    thread3 = MyThreadClass('Thread#3 ', randint(1, 10))
    thread4 = MyThreadClass('Thread#4 ', randint(1, 10))
    thread5 = MyThreadClass('Thread#5 ', randint(1, 10))
    thread6 = MyThreadClass('Thread#6 ', randint(1, 10))
    thread7 = MyThreadClass('Thread#7 ', randint(1, 10))
    thread8 = MyThreadClass('Thread#8 ', randint(1, 10))
    thread9 = MyThreadClass('Thread#9 ', randint(1, 10))

    # Thread Running, da de alta el hilo en el sistema operativo
    thread1.start()
    thread2.start()
    thread3.start()
    thread4.start()
    thread5.start()
    thread6.start()
    thread7.start()
    thread8.start()
    thread9.start()

    # Thread joining
    thread1.join()
    thread2.join()
    thread3.join()
    thread4.join()
    thread5.join()
    thread6.join()
    thread7.join()
    thread8.join()
    thread9.join()

    # End 
    print("End")

    #Execution Time
    toc = time.time()
    print("--- {} seconds ---".format(toc - tic))


main()

---> Thread#1  running, belonging to process ID 2600

---> Thread#2  running, belonging to process ID 2600
---> Thread#3  running, belonging to process ID 2600


---> Thread#4  running, belonging to process ID 2600

---> Thread#5  running, belonging to process ID 2600
---> Thread#6  running, belonging to process ID 2600


---> Thread#7  running, belonging to process ID 2600

---> Thread#8  running, belonging to process ID 2600

---> Thread#9  running, belonging to process ID 2600

---> Thread#7  over

---> Thread#1  over

---> Thread#9  over

---> Thread#3  over

---> Thread#8  over

---> Thread#2  over

---> Thread#5  over

---> Thread#6  over

---> Thread#4  over

End
--- 9.01617980003357 seconds ---


## Lock

In [16]:
import threading
import time
import os
from threading import Thread
from random import randint

# Lock Definition
threadLock = threading.Lock()

class MyThreadClass (Thread):
    
    def __init__(self, name, duration):
        Thread.__init__(self)
        self.name = name
        self.duration = duration
        
    def run(self):
        #Acquire the Lock
        threadLock.acquire()      
        
        print ('---> {} running, belonging to process ID {}\n'.format(self.name, os.getpid()))
        time.sleep(self.duration)
        print ('---> {} over\n'.format(self.name))
        
        #Release the Lock
        threadLock.release()


def main():
    tic = time.time()
    # Thread Creation
    thread1 = MyThreadClass('Thread#1 ', randint(1, 10))
    thread2 = MyThreadClass('Thread#2 ', randint(1, 10))
    thread3 = MyThreadClass('Thread#3 ', randint(1, 10))
    thread4 = MyThreadClass('Thread#4 ', randint(1, 10))
    thread5 = MyThreadClass('Thread#5 ', randint(1, 10))
    thread6 = MyThreadClass('Thread#6 ', randint(1, 10))
    thread7 = MyThreadClass('Thread#7 ', randint(1, 10))
    thread8 = MyThreadClass('Thread#8 ', randint(1, 10))
    thread9 = MyThreadClass('Thread#9 ', randint(1, 10))

    # Thread Running
    thread1.start()
    thread2.start()
    thread3.start()
    thread4.start()
    thread5.start()
    thread6.start()
    thread7.start()
    thread8.start()
    thread9.start()

    # Thread joining
    thread1.join()
    thread2.join()
    thread3.join()
    thread4.join()
    thread5.join()
    thread6.join()
    thread7.join()
    thread8.join()
    thread9.join()

    # End 
    print("End")

    #Execution Time
    toc = time.time()
    print("--- {} seconds ---".format(toc - tic))


main()

---> Thread#1  running, belonging to process ID 2600

---> Thread#1  over

---> Thread#2  running, belonging to process ID 2600

---> Thread#2  over

---> Thread#3  running, belonging to process ID 2600

---> Thread#3  over

---> Thread#4  running, belonging to process ID 2600

---> Thread#4  over

---> Thread#5  running, belonging to process ID 2600

---> Thread#5  over

---> Thread#6  running, belonging to process ID 2600

---> Thread#6  over

---> Thread#7  running, belonging to process ID 2600

---> Thread#7  over

---> Thread#8  running, belonging to process ID 2600

---> Thread#8  over

---> Thread#9  running, belonging to process ID 2600

---> Thread#9  over

End
--- 56.053263664245605 seconds ---


In [12]:
import threadingmy_func called by thread N°0
my_func called by thread N°1
my_func called by thread N°2
my_func called by thread N°3
my_func called by thre
import time
import os
from threading import Thread
from random import randint

# Lock Definition
threadLock = threading.Lock()

class MyThreadClass (Thread):
    
    def __init__(self, name, duration):
        Thread.__init__(self)
        self.name = name
        self.duration = duration
        
    def run(self):
        #Acquire the Lock
        threadLock.acquire()      
        
        print ('---> {} running, belonging to process ID {}\n'.format(self.name, os.getpid()))
        
        #Release the Lock
        threadLock.release()
        time.sleep(self.duration)
        print ('---> {} over\n'.format(self.name))


def main():
    tic = time.time()
    # Thread Creation
    thread1 = MyThreadClass('Thread#1 ', randint(1, 10))
    thread2 = MyThreadClass('Thread#2 ', randint(1, 10))
    thread3 = MyThreadClass('Thread#3 ', randint(1, 10))
    thread4 = MyThreadClass('Thread#4 ', randint(1, 10))
    thread5 = MyThreadClass('Thread#5 ', randint(1, 10))
    thread6 = MyThreadClass('Thread#6 ', randint(1, 10))
    thread7 = MyThreadClass('Thread#7 ', randint(1, 10))
    thread8 = MyThreadClass('Thread#8 ', randint(1, 10))
    thread9 = MyThreadClass('Thread#9 ', randint(1, 10))

    # Thread Running
    thread1.start()
    thread2.start()
    thread3.start()
    thread4.start()
    thread5.start()
    thread6.start()
    thread7.start()
    thread8.start()
    thread9.start()

    # Thread joining
    thread1.join()
    thread2.join()
    thread3.join()
    thread4.join()
    thread5.join()
    thread6.join()
    thread7.join()
    thread8.join()
    thread9.join()

    # End 
    print("End")

    #Execution Time
    toc = time.time()
    print("--- {} seconds ---".format(toc - tic))


main()

---> Thread#1  running, belonging to process ID 2600

---> Thread#2  running, belonging to process ID 2600

---> Thread#3  running, belonging to process ID 2600

---> Thread#4  running, belonging to process ID 2600

---> Thread#5  running, belonging to process ID 2600

---> Thread#6  running, belonging to process ID 2600

---> Thread#7  running, belonging to process ID 2600

---> Thread#8  running, belonging to process ID 2600

---> Thread#9  running, belonging to process ID 2600

---> Thread#2  over

---> Thread#8  over

---> Thread#3  over

---> Thread#1  over

---> Thread#7  over

---> Thread#4  over

---> Thread#5  over

---> Thread#9  over

---> Thread#6  over

End
--- 10.013882637023926 seconds ---


## Semaphore

In [17]:
import logging
import threading
import time
import random

LOG_FORMAT = '%(asctime)s %(threadName)-17s %(levelname)-8s %(message)s'
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)

semaphore = threading.Semaphore(0)
item = 0

def consumer():
    logging.info('Consumer is waiting')
    semaphore.acquire()
    logging.info('Consumer notify: item number {}'.format(item))


def producer():
    global item
    time.sleep(3)
    item = random.randint(0, 1000)
    logging.info('Producer notify: item number {}'.format(item))
    semaphore.release()


def main():
    for i in range(10):
        t1 = threading.Thread(target=consumer)
        t2 = threading.Thread(target=producer)

        t1.start()
        t2.start()

        t1.join()
        t2.join()


main()

2020-02-28 11:54:18,814 Thread-105        INFO     Consumer is waiting
2020-02-28 11:54:21,815 Thread-106        INFO     Producer notify: item number 821
2020-02-28 11:54:21,817 Thread-105        INFO     Consumer notify: item number 821
2020-02-28 11:54:21,818 Thread-107        INFO     Consumer is waiting
2020-02-28 11:54:24,819 Thread-108        INFO     Producer notify: item number 559
2020-02-28 11:54:24,822 Thread-107        INFO     Consumer notify: item number 559
2020-02-28 11:54:24,827 Thread-109        INFO     Consumer is waiting
2020-02-28 11:54:27,831 Thread-110        INFO     Producer notify: item number 912
2020-02-28 11:54:27,834 Thread-109        INFO     Consumer notify: item number 912
2020-02-28 11:54:27,837 Thread-111        INFO     Consumer is waiting
2020-02-28 11:54:30,839 Thread-112        INFO     Producer notify: item number 756
2020-02-28 11:54:30,842 Thread-111        INFO     Consumer notify: item number 756
2020-02-28 11:54:30,847 Thread-113        IN

## Conditional

In [18]:
import logging
import threading
import time
#En los condicionales se crea una condición de espera
LOG_FORMAT = '%(asctime)s %(threadName)-17s %(levelname)-8s %(message)s'
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)

items = []
condition = threading.Condition()


class Consumer(threading.Thread):
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def consume(self):

        with condition:
            if len(items) == 0:
                logging.info('no items to consume')
                condition.wait()

            items.pop()
            logging.info('consumed 1 item')

            condition.notify()

    def run(self):
        for i in range(20):
            time.sleep(2)
            self.consume()


class Producer(threading.Thread):
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def produce(self):

        with condition:
            if len(items) == 10:
                logging.info('items produced {}. Stopped'.format(len(items)))
                condition.wait()

            items.append(1)
            logging.info('total items {}'.format(len(items)))

            condition.notify()

    def run(self):
        for i in range(20):
            time.sleep(0.5)
            self.produce()


def main():
    producer = Producer(name='Producer')
    consumer = Consumer(name='Consumer')

    producer.start()
    consumer.start()

    producer.join()
    consumer.join()


main()

2020-02-28 11:55:06,399 Producer          INFO     total items 1
2020-02-28 11:55:06,903 Producer          INFO     total items 2
2020-02-28 11:55:07,407 Producer          INFO     total items 3
2020-02-28 11:55:07,899 Consumer          INFO     consumed 1 item
2020-02-28 11:55:07,911 Producer          INFO     total items 3
2020-02-28 11:55:08,411 Producer          INFO     total items 4
2020-02-28 11:55:08,914 Producer          INFO     total items 5
2020-02-28 11:55:09,417 Producer          INFO     total items 6
2020-02-28 11:55:09,903 Consumer          INFO     consumed 1 item
2020-02-28 11:55:09,920 Producer          INFO     total items 6
2020-02-28 11:55:10,421 Producer          INFO     total items 7
2020-02-28 11:55:10,924 Producer          INFO     total items 8
2020-02-28 11:55:11,427 Producer          INFO     total items 9
2020-02-28 11:55:11,906 Consumer          INFO     consumed 1 item
2020-02-28 11:55:11,929 Producer          INFO     total items 9
2020-02-28 11:55:12

## Barrier

In [15]:
from random import randrange
from threading import Barrier, Thread
from time import ctime, sleep

num_runners = 3
finish_line = Barrier(num_runners)
runners = ['Huey', 'Dewey', 'Louie']

def runner():
    name = runners.pop()
    sleep(randrange(2, 5))
    print('%s reached the barrier at: %s \n' % (name, ctime()))
    finish_line.wait()

def main():
    threads = []
    print('START RACE!!!!')
    for i in range(num_runners):
        threads.append(Thread(target=runner))
        threads[-1].start()
    for thread in threads:
        thread.join()
    print('Race over!')

main()

START RACE!!!!
Huey reached the barrier at: Fri Feb 28 11:47:53 2020 

Louie reached the barrier at: Fri Feb 28 11:47:53 2020 

Dewey reached the barrier at: Fri Feb 28 11:47:54 2020 

Race over!


## Queue for catching return

In [21]:
from queue import Queue
import threading
import time

def function_A(message):
    time.sleep(3)
    return threading.currentThread().getName() + ' ' + message

def function_B(integer):
    time.sleep(2)
    return threading.currentThread().getName() + ' ' + str(integer)

def function_C(real):
    time.sleep(1)
    return threading.currentThread().getName() + ' ' + str(real)

que = Queue()

t1 = threading.Thread(target=lambda q, arg: q.put(function_A(arg)), args=(que, '1'))
t2 = threading.Thread(target=lambda q, arg: q.put(function_B(arg)), args=(que, 2))
t3 = threading.Thread(target=lambda q, arg: q.put(function_C(arg)), args=(que, 3.)) #lambda es una función cuealquiera 
#Thread, y como argumento se pasa una cola y el dato como tal
t1.start()
t2.start()
t3.start()

threads_list = [t1, t2, t3]

for t in threads_list:
    t.join()

while not que.empty():
    result = que.get()
    print(result)

Thread-139 3.0
Thread-138 2
Thread-137 1


# Multiprocessing

In [22]:
import multiprocessing

def myFunc(i):
    print ('calling myFunc from process n°: {}'.format(i))
    for j in range (0, i):
        print('output from myFunc is : {}'.format(j))

for i in range(6):
    process = multiprocessing.Process(target=myFunc, args=(i,))
    process.start()
    process.join()

calling myFunc from process n°: 0
calling myFunc from process n°: 1
output from myFunc is : 0
calling myFunc from process n°: 2
output from myFunc is : 0
output from myFunc is : 1
calling myFunc from process n°: 3
output from myFunc is : 0
output from myFunc is : 1
output from myFunc is : 2
calling myFunc from process n°: 4
output from myFunc is : 0
output from myFunc is : 1
output from myFunc is : 2
output from myFunc is : 3
calling myFunc from process n°: 5
output from myFunc is : 0
output from myFunc is : 1
output from myFunc is : 2
output from myFunc is : 3
output from myFunc is : 4


In [23]:
import multiprocessing
import time

def myFunc():
    name = multiprocessing.current_process().name
    print ('Starting process name = {} \n'.format(name))
    time.sleep(3)
    print ('Exiting process name = {} \n'.format(name))

    
process_with_name = multiprocessing.Process(name='myFunc process', target=myFunc)

# process_with_name.daemon = True

process_with_default_name = multiprocessing.Process(target=myFunc)
process_with_name.start()
process_with_default_name.start()

process_with_name.join()
process_with_default_name.join()

Starting process name = myFunc process 

Starting process name = Process-28 

Exiting process name = myFunc process 

Exiting process name = Process-28 



In [26]:
import multiprocessing
import queue as que
import random
import time

class producer(multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue
        
    def run(self) :
        for i in range(10):
            item = random.randint(0, 256)
            self.queue.put(item) 
            print ("Process Producer : item {} appended to queue {}".format(item, self.name))
            time.sleep(1)
            print ("The size of queue is {}".format(self.queue.qsize()))
       

class consumer(multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def run(self):
        while True:
            if (self.queue.empty()):
                print("the queue is empty")
                break
            else :
                time.sleep(2)
                item = self.queue.get()
                print ('Process Consumer : item {} popped from by {} \n'.format(item, self.name))
                time.sleep(1)



queue = que.Queue()
process_producer = producer(queue)
process_consumer = consumer(queue)
process_producer.start()
process_consumer.start()
process_producer.join()
process_consumer.join()

the queue is empty
Process Producer : item 198 appended to queue producer-33
The size of queue is 1
Process Producer : item 81 appended to queue producer-33
The size of queue is 2
Process Producer : item 253 appended to queue producer-33
The size of queue is 3
Process Producer : item 163 appended to queue producer-33
The size of queue is 4
Process Producer : item 190 appended to queue producer-33
The size of queue is 5
Process Producer : item 64 appended to queue producer-33
The size of queue is 6
Process Producer : item 158 appended to queue producer-33
The size of queue is 7
Process Producer : item 32 appended to queue producer-33
The size of queue is 8
Process Producer : item 204 appended to queue producer-33
The size of queue is 9
Process Producer : item 256 appended to queue producer-33
The size of queue is 10
