In [1]:
from time import sleep, time, ctime
from threading import Thread, Lock, current_thread, BoundedSemaphore
import atexit
from random import randrange

In [2]:
def count(n):
    while n > 0:
        n -= 1
#         if n % 10000000 == 0:
#             print(n)

In [3]:
%%time
n = 10000000*5
count(n)

CPU times: user 4.19 s, sys: 0 ns, total: 4.19 s
Wall time: 4.19 s


Try threads

In [4]:
%%time
t0 = time()
t1 = Thread(target = count, args = (n/2,))
t1.start()
t2 = Thread(target = count, args = (n/2,))
t2.start()
t1.join()
t2.join()

CPU times: user 4.31 s, sys: 32 ms, total: 4.34 s
Wall time: 4.34 s


In [5]:
class Func(object):
    
    def __init__(self, func, args, **kwargs):
        self.func = func
        self.args = args
        self.kwargs = kwargs
        
    def __call__(self):
        return self.func(*self.args, **self.kwargs)

In [6]:
%%time
f = Func(count, (n/2,))
t1 = Thread(target = f)
t2 = Thread(target = f)
t1.start()
t2.start()
t1.join()
t2.join()

CPU times: user 4.33 s, sys: 12 ms, total: 4.34 s
Wall time: 4.35 s


In [7]:
class MyThread(Thread):
    
    def __init__(self, name, func, args):
        super().__init__()
        self.func = func
        self.args = args
        self.name = name
        
    def run(self):
        print('Thread `{name}` starting at: {t}'.format(name=self.name,t=ctime()))
        self.rez = self.func(*self.args)
        print('Thread `{name}` finished at: {t}'.format(name=self.name,t=ctime()))

In [8]:
%%time
t1 = MyThread('t1', count, (n/2,))
t2 = MyThread('t2', count, (n/2,))
t1.start()
t2.start()
print(current_thread().name)
t1.join()
t2.join()

Thread `t1` starting at: Sat Sep 23 13:13:31 2017Thread `t2` starting at: Sat Sep 23 13:13:31 2017MainThread


Thread `t2` finished at: Sat Sep 23 13:13:35 2017
Thread `t1` finished at: Sat Sep 23 13:13:35 2017
CPU times: user 4.31 s, sys: 16 ms, total: 4.33 s
Wall time: 4.34 s


Using Sync Primitives

In [12]:
class CleanOutputSet(set):
    def __str__(self):
        return ', '.join(x for x in self)

lock = Lock()
loops = [randrange(2,5) for x in range(randrange(3,7))]
remaining = CleanOutputSet()

def loop(nsec):
    myname = current_thread().name
    with lock:
        remaining.add(myname)
        print('[{0}] Started {1}'.format(ctime(), myname))
    sleep(nsec)
    with lock:
        remaining.remove(myname)
        print('[{0}] Completed {1} ({2} secs)'.format(ctime(), myname, nsec))
        print('   (remaining: {0})'.format(remaining or 'NONE'))

def main():
    for pause in loops:
        Thread(target=loop, args=(pause,)).start()
    
@atexit.register
def _atexit():
    print('all DONE at:', ctime())

print(list(loops))
main()

[4, 3, 2, 3]
[Sat Sep 23 15:18:34 2017] Started Thread-17
[Sat Sep 23 15:18:34 2017] Started Thread-18
[Sat Sep 23 15:18:34 2017] Started Thread-19
[Sat Sep 23 15:18:34 2017] Started Thread-20
[Sat Sep 23 15:18:36 2017] Completed Thread-19 (2 secs)
   (remaining: Thread-17, Thread-20, Thread-18)
[Sat Sep 23 15:18:37 2017] Completed Thread-18 (3 secs)
   (remaining: Thread-17, Thread-20)
[Sat Sep 23 15:18:37 2017] Completed Thread-20 (3 secs)
   (remaining: Thread-17)
[Sat Sep 23 15:18:38 2017] Completed Thread-17 (4 secs)
   (remaining: NONE)


Using Semaphore

In [190]:
lock = Lock()
MAX = 10
service = BoundedSemaphore(MAX)

def add_candy():
    with lock:
        print('Refilling candy...', end = '')
        try:
            service.release()
#         except Exception as error:
#             print('Error occured: {0}'.format(str(type(error).__name__ + ' : ' + ' '.join(error.args))))
        except ValueError:
            print('Sorry, it is full')
        else:
            print('OK')
        print('[{0}]'.format(service._value))

def buy_candy():
    with lock:
        print('Buying candy...', end='')
        if service.acquire(False):
            print('OK')
        else:
            print('Sorry, it is empty')
        print('[{0}]'.format(service._value))
            
def producer(loops):
    for i in range(loops):
        add_candy()
        sleep(randrange(2))

def consumer(loops):
    for i in range(loops):
        buy_candy()
        sleep(randrange(2))   
        
def main():
    nloops = randrange(3,5)
    nloops_buy = randrange(nloops, nloops + MAX + 4)
    print('Buy: {0}, Sell: {1}\n'.format(nloops_buy, nloops))
    c = Thread(target=consumer, args = (nloops_buy,))
    p = Thread(target=producer, args = (nloops,))
    c.start()
    p.start()
    
main()

Buy: 16, Sell: 3
Buying candy...
OK
[9]
Buying candy...OK
[8]
Buying candy...OK
[7]
Buying candy...OK
[6]
Refilling candy...OK
[7]
Buying candy...OK
[6]
Refilling candy...OK
[7]
Buying candy...OK
[6]
Buying candy...OK
[5]
Refilling candy...OK
[6]
Buying candy...OK
[5]
Buying candy...OK
[4]
Buying candy...OK
[3]
Buying candy...OK
[2]
Buying candy...OK
[1]
Buying candy...OK
[0]
Buying candy...Sorry, it is empty
[0]
Buying candy...Sorry, it is empty
[0]
Buying candy...Sorry, it is empty
[0]


Queue

In [18]:
from queue import Queue

In [20]:
def consume(q):
    while True:
        name = current_thread().name
        print('[{0}] Thread {1} started to get, size = {2}'.format(ctime(), name, q.qsize()))
        item = q.get()
        sleep(randrange(3))
        print('[{0}] Thread {1} consumed, size = {2}'.format(ctime(), name, q.qsize()))
        q.task_done()
  

def produce(q, loops):
    for k in range(loops):
        name = current_thread().name
        print('[{0}] Thread {1} started to put, size = {2}'.format(ctime(), name, q.qsize()))
        item = k
        q.put(k)
        sleep(randrange(1))
        print('[{0}] Thread {1} put, size = {2}'.format(ctime(), name, q.qsize()))

        
def main():
    loops = randrange(3,5)
    q = Queue(maxsize=20)
    c = Thread(name='ConsumerThread', target=consume, args=(q,))#, daemon = True)
    p = Thread(name='ProducerThread', target=produce, args=(q, loops))#, daemon = True)
    c.start()
    p.start()
    
    q.join()
    
main()

[Sat Sep 23 15:27:52 2017] Thread ConsumerThread started to get, size = 0
[Sat Sep 23 15:27:52 2017] Thread ProducerThread started to put, size = 0[Sat Sep 23 15:27:52 2017] Thread ConsumerThread consumed, size = 0

[Sat Sep 23 15:27:52 2017] Thread ProducerThread put, size = 1[Sat Sep 23 15:27:52 2017] Thread ConsumerThread started to get, size = 1
[Sat Sep 23 15:27:52 2017] Thread ProducerThread started to put, size = 0

[Sat Sep 23 15:27:52 2017] Thread ProducerThread put, size = 1[Sat Sep 23 15:27:52 2017] Thread ConsumerThread consumed, size = 1
[Sat Sep 23 15:27:52 2017] Thread ProducerThread started to put, size = 1
[Sat Sep 23 15:27:52 2017] Thread ProducerThread put, size = 2

[Sat Sep 23 15:27:52 2017] Thread ConsumerThread started to get, size = 1
[Sat Sep 23 15:27:54 2017] Thread ConsumerThread consumed, size = 0
[Sat Sep 23 15:27:54 2017] Thread ConsumerThread started to get, size = 0


##### Thread and Process Pools

In [10]:
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, Executor
from multiprocessing import cpu_count
from functools import partial
from itertools import chain
import os
import time
import xml.etree.ElementTree as ET

In [2]:
print(f'Total cores: {cpu_count()}')

Total cores: 4


In [2]:
data = '/home/karimlulu/repos/Multithreading/data'
filenames = []
for dirpath, dirnames, files in os.walk(data):
    for file in files:
        filenames.append(os.path.join(dirpath, file))

print(f'Total files: {len(filenames)}')

Total files: 16149


In [3]:
def job(filename):
    try:
        with open(filename, 'r') as f:
            tree = ET.parse(f)       
    except ET.ParseError as error:
        return str(error)
    else:
        tags = [elem.tag for elem in tree.iter()]
        return list(set(tags))

In [4]:
files = []
n = 5000
sample = filenames[:n]

In [5]:
t0 = time.time()
for filename in sample:
    files.append(job(filename))

print(f'Single thread: {time.time()-t0:.2f} sec')

Single thread: 33.35 sec


In [144]:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from multiprocessing import cpu_count
from functools import partial

import itertools


def _thread_worker(batch, job):
    result = [job(obj) for obj in batch]
    return result


def _process_worker(batch, job, n_threads):
    chunks = [batch[i::n_threads] for i in range(n_threads)]
    worker = partial(_thread_worker, job = job)

    with ThreadPoolExecutor(max_workers = n_threads) as executor:
        futures = [executor.submit(worker, c) for c in chunks]

    result = [f.result() for f in futures]
    return list(itertools.chain(*result))


def _master_worker(batch, job, n_processes, n_threads):
    chunks = [batch[i::n_processes] for i in range(n_processes)]
    worker = partial(process_worker, job = job, n_threads = n_threads)

    with ProcessPoolExecutor(max_workers = n_processes) as executor:
        futures = [executor.submit(worker, c) for c in chunks]

    result = [f.result() for f in futures]
    return list(itertools.chain(*result))


def process_worker(batch, job, n_threads):
    if n_threads == 1:
        result = _thread_worker(batch, job)
    else:
        result = _process_worker(batch, job, n_threads)
    return result


def mp_map(data, job, n_processes = -1, n_threads = -1):
    n_processes = cpu_count() if n_processes < 1 else n_processes
    n_processes = min(n_processes, len(data))
    n_threads = 1 if n_threads < 1 or n_processes == len(data) else n_threads
    if n_processes == 1:
        result = process_worker(data, job, n_threads)
    else:
        result = _master_worker(data, job, n_processes, n_threads)
    return result

In [185]:
t0 = time.time()
r = mp_map(sample, job, n_processes=6, n_threads=1)
print(f'Spent {time.time()-t0:.2f} sec')

Spent 15.84 sec
