In [2]:
import threading

# threading — Thread-based parallelism

https://docs.python.org/3/library/threading.html

## GIL (CPython)

[Global Interpreter Lock](https://en.wikipedia.org/wiki/Global_interpreter_lock)

[Inside the Python GIL](http://www.dabeaz.com/python/GIL.pdf)

<img src="https://upload.wikimedia.org/wikipedia/commons/0/09/GIL_description.gif" align="left"/>

In [2]:
threading.active_count()

# Note Jupyter create their own threads

5

In [3]:
threading.current_thread()

<_MainThread(MainThread, started 140475845437248)>

In [4]:
print(dir(threading.current_thread()))

['__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_args', '_bootstrap', '_bootstrap_inner', '_daemonic', '_delete', '_ident', '_initialized', '_invoke_excepthook', '_is_stopped', '_kwargs', '_name', '_native_id', '_reset_internal_locks', '_set_ident', '_set_native_id', '_set_tstate_lock', '_started', '_stderr', '_stop', '_target', '_tstate_lock', '_wait_for_tstate_lock', 'daemon', 'getName', 'ident', 'isAlive', 'isDaemon', 'is_alive', 'join', 'name', 'native_id', 'run', 'setDaemon', 'setName', 'start']


In [12]:
threading.current_thread().name

# When intepreter start it create thread with name: ``MainThread``

'MainThread'

In [10]:
threading.current_thread().name = "WTF"

threading.current_thread().name

'WTF'

In [13]:
threading.get_ident()

140475845437248

In [15]:
threading.get_native_id()  # Python 3.8+

5202

In [18]:
threading.enumerate()

[<_MainThread(MainThread, started 140475845437248)>,
 <Thread(Thread-2, started daemon 140475761223424)>,
 <Heartbeat(Thread-3, started daemon 140475752830720)>,
 <HistorySavingThread(IPythonHistorySavingThread, started 140475727390464)>,
 <ParentPollerUnix(Thread-1, started daemon 140475718735616)>]

In [None]:
for tr in threading.enumerate():
    print(tr.getName())

## Thread Objects

`threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)`

In [1]:
from threading import Thread

In [20]:
help(threading.Thread)

Help on class Thread in module threading:

class Thread(builtins.object)
 |  Thread(group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None)
 |  
 |  A class that represents a thread of control.
 |  
 |  This class can be safely subclassed in a limited fashion. There are two ways
 |  to specify the activity: by passing a callable object to the constructor, or
 |  by overriding the run() method in a subclass.
 |  
 |  Methods defined here:
 |  
 |  __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None)
 |      This constructor should always be called with keyword arguments. Arguments are:
 |      
 |      *group* should be None; reserved for future extension when a ThreadGroup
 |      class is implemented.
 |      
 |      *target* is the callable object to be invoked by the run()
 |      method. Defaults to None, meaning nothing is called.
 |      
 |      *name* is the thread name. By default, a unique name is constructed of
 |      t

In [21]:
Thread()

<Thread(Thread-4, initial)>

In [22]:
threading.enumerate()

[<_MainThread(MainThread, started 140475845437248)>,
 <Thread(Thread-2, started daemon 140475761223424)>,
 <Heartbeat(Thread-3, started daemon 140475752830720)>,
 <HistorySavingThread(IPythonHistorySavingThread, started 140475727390464)>,
 <ParentPollerUnix(Thread-1, started daemon 140475718735616)>]

In [25]:
import time

def function_run_in_thread():
    print(f"{threading.current_thread().name = }")
    time.sleep(4)
    print("Thats All Folks")
    
thread = Thread(target=function_run_in_thread)
thread.start()

threading.current_thread().name = 'Thread-6'


In [26]:
threading.enumerate()

[<_MainThread(MainThread, started 140475845437248)>,
 <Thread(Thread-2, started daemon 140475761223424)>,
 <Heartbeat(Thread-3, started daemon 140475752830720)>,
 <HistorySavingThread(IPythonHistorySavingThread, started 140475727390464)>,
 <ParentPollerUnix(Thread-1, started daemon 140475718735616)>,
 <Thread(Thread-6, started 140475709556480)>]

Thats All Folks


In [4]:
from functools import wraps


def thread_info(fn):

    @wraps(fn)
    def wrapper(*args, **kwargs):
        print(f"{threading.current_thread().name}. Start function: {fn.__name__}. {args = }, {kwargs = }")
        try:
            return fn(*args, **kwargs)
        finally:
            print(f"{threading.current_thread().name}. End function: {fn.__name__}")

    return wrapper

In [28]:
@thread_info
def add_(a, b):
    return a + b

add_(1, 2)

MainThread. Start function: add_. args = (1, 2), kwargs = {}
MainThread. End function: add_


3

In [29]:
@thread_info
def function_run_in_thread(arg1, arg2, kwarg1=None, kwarg2=None):
    print(f"{arg1 = }, {arg2 = }, {kwarg1 = }, {kwarg2 = }")
    time.sleep(4)
    print("Thats All Folks")
    
thread = Thread(target=function_run_in_thread, args=(1, 2), kwargs={"kwarg1": 3, "kwarg2": 4})
thread.start()

Thread-7. Start function: function_run_in_thread. args = (1, 2), kwargs = {'kwarg1': 3, 'kwarg2': 4}
arg1 = 1, arg2 = 2, kwarg1 = 3, kwarg2 = 4
Thats All Folks
Thread-7. End function: function_run_in_thread


In [30]:
thread = Thread(target=function_run_in_thread, args=(1, 2, 3, 4))
thread.start()

Thread-8. Start function: function_run_in_thread. args = (1, 2, 3, 4), kwargs = {}
arg1 = 1, arg2 = 2, kwarg1 = 3, kwarg2 = 4
Thats All Folks
Thread-8. End function: function_run_in_thread


In [31]:
thread = Thread(target=function_run_in_thread,kwargs={"kwarg1": 3, "kwarg2": 4, "arg1": 1, "arg2": 2})
thread.start()

Thread-9. Start function: function_run_in_thread. args = (), kwargs = {'kwarg1': 3, 'kwarg2': 4, 'arg1': 1, 'arg2': 2}
arg1 = 1, arg2 = 2, kwarg1 = 3, kwarg2 = 4
Thats All Folks
Thread-9. End function: function_run_in_thread


In [34]:
thread = Thread(target=function_run_in_thread, args=(1, 2, ))
thread.start()
raise SystemError("Let's kill MainThread")

Thread-11. Start function: function_run_in_thread. args = (1, 2), kwargs = {}
arg1 = 1, arg2 = 2, kwarg1 = None, kwarg2 = None


SystemError: Let's kill MainThread

Thats All Folks
Thread-11. End function: function_run_in_thread


In [35]:
@thread_info
def add_to_storage():
    pass

@thread_info
def read_from_storage():
    pass


write_thread = Thread(target=add_to_storage, name="Writer")
read_thread = Thread(target=read_from_storage, name="Reader")

write_thread.start()
read_thread.start()

Writer. Start function: add_to_storage. args = (), kwargs = {}
Writer. End function: add_to_storage
Reader. Start function: read_from_storage. args = (), kwargs = {}
Reader. End function: read_from_storage


In [37]:
storage = []


@thread_info
def add_to_storage():
    for i in range(100):
        storage.append(i)

@thread_info
def read_from_storage():
    for i in range(100):
        print(storage.pop(0), end=";")
    

write_thread = Thread(target=add_to_storage, name="Writer-Thread")
read_thread = Thread(target=read_from_storage, name="Reader-Thread")

write_thread.start()
read_thread.start()

Writer-Thread. Start function: add_to_storage. args = (), kwargs = {}
Writer-Thread. End function: add_to_storage
Reader-Thread. Start function: read_from_storage. args = (), kwargs = {}
0;1;2;3;4;5;6;7;8;9;10;11;12;13;14;15;16;17;18;19;20;21;22;23;24;25;26;27;28;29;30;31;32;33;34;35;36;37;38;39;40;41;42;43;44;45;46;47;48;49;50;51;52;53;54;55;56;57;58;59;60;61;62;63;64;65;66;67;68;69;70;71;72;73;74;75;76;77;78;79;80;81;82;83;84;85;86;87;88;89;90;91;92;93;94;95;96;97;98;99;Reader-Thread. End function: read_from_storage


In [39]:
storage = []


@thread_info
def add_to_storage():
    for i in range(100):
        storage.append(i)

@thread_info
def read_from_storage():
    for i in range(100):
        print(storage.pop(0), end=";")
    

write_thread = Thread(target=add_to_storage, name="Writer-Thread")
read_thread = Thread(target=read_from_storage, name="Reader-Thread")

write_thread.start()
read_thread.start()

print(f"{len(storage) = }")

Writer-Thread. Start function: add_to_storage. args = (), kwargs = {}
Writer-Thread. End function: add_to_storage
Reader-Thread. Start function: read_from_storage. args = (), kwargs = {}
0;1;2;3len(storage) = 96
;4;5;6;7;8;9;10;11;12;13;14;15;16;17;18;19;20;21;22;23;24;25;26;27;28;29;30;31;32;33;34;35;36;37;38;39;40;41;42;43;44;45;46;47;48;49;50;51;52;53;54;55;56;57;58;59;60;61;62;63;64;65;66;67;68;69;70;71;72;73;74;75;76;77;78;79;80;81;82;83;84;85;86;87;88;89;90;91;92;93;94;95;96;97;98;99;Reader-Thread. End function: read_from_storage


In [40]:
storage = []


@thread_info
def add_to_storage():
    for i in range(100):
        storage.append(i)

@thread_info
def read_from_storage():
    for i in range(100):
        print(storage.pop(0), end=";")


write_thread = Thread(target=add_to_storage, name="Writer-Thread")
read_thread = Thread(target=read_from_storage, name="Reader-Thread")

write_thread.start()
read_thread.start()

write_thread.join()
read_thread.join()

print(f"\n\n{threading.current_thread().name}: {len(storage) = }\n\n", )

Writer-Thread. Start function: add_to_storage. args = (), kwargs = {}
Writer-Thread. End function: add_to_storage
Reader-Thread. Start function: read_from_storage. args = (), kwargs = {}
0;1;2;3;4;5;6;7;8;9;10;11;12;13;14;15;16;17;18;19;20;21;22;23;24;25;26;27;28;29;30;31;32;33;34;35;36;37;38;39;40;41;42;43;44;45;46;47;48;49;50;51;52;53;54;55;56;57;58;59;60;61;62;63;64;65;66;67;68;69;70;71;72;73;74;75;76;77;78;79;80;81;82;83;84;85;86;87;88;89;90;91;92;93;94;95;96;97;98;99;Reader-Thread. End function: read_from_storage


MainThread: len(storage) = 0




In [41]:
storage = []
squares = []

@thread_info
def add_to_storage():
    for i in range(100):
        storage.append(i)

@thread_info
def read_from_storage():
    while True:
        try:
            squares.append(storage.pop(0) ** 2)
        except IndexError:
            break

write_thread = Thread(target=add_to_storage, name="Writer-Thread")
read_thread = Thread(target=read_from_storage, name="Reader-Thread")

write_thread.start()
read_thread.start()

write_thread.join()
read_thread.join()

print(f"\n\n{threading.current_thread().name}: {squares = }\n\n", )

Writer-Thread. Start function: add_to_storage. args = (), kwargs = {}
Writer-Thread. End function: add_to_storage
Reader-Thread. Start function: read_from_storage. args = (), kwargs = {}
Reader-Thread. End function: read_from_storage


MainThread: squares = [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529, 576, 625, 676, 729, 784, 841, 900, 961, 1024, 1089, 1156, 1225, 1296, 1369, 1444, 1521, 1600, 1681, 1764, 1849, 1936, 2025, 2116, 2209, 2304, 2401, 2500, 2601, 2704, 2809, 2916, 3025, 3136, 3249, 3364, 3481, 3600, 3721, 3844, 3969, 4096, 4225, 4356, 4489, 4624, 4761, 4900, 5041, 5184, 5329, 5476, 5625, 5776, 5929, 6084, 6241, 6400, 6561, 6724, 6889, 7056, 7225, 7396, 7569, 7744, 7921, 8100, 8281, 8464, 8649, 8836, 9025, 9216, 9409, 9604, 9801]




In [48]:
storage = []
squares = []

@thread_info
def add_to_storage(x):
    for i in range(x):
        storage.append(i)
        time.sleep(0.0001)  # Let's simulate I/O operation

@thread_info
def read_from_storage():
    while True:
        try:
            squares.append(storage.pop(0) ** 2)
        except IndexError:
            break

write_thread = Thread(target=add_to_storage, name="Writer-Thread", args=(100,))
read_thread = Thread(target=read_from_storage, name="Reader-Thread")

write_thread.start()
read_thread.start()

write_thread.join()
read_thread.join()

print(f"\n\n{threading.current_thread().name}: {squares = }\n\n", )


# Be carrefull to global variables. Do not reccomend to use global sequinces as queues

Writer-Thread. Start function: add_to_storage. args = (100,), kwargs = {}
Reader-Thread. Start function: read_from_storage. args = (), kwargs = {}
Reader-Thread. End function: read_from_storage
Writer-Thread. End function: add_to_storage


MainThread: squares = [0, 1, 4]




## queue — A synchronized queue class

https://docs.python.org/3/library/queue.html#module-queue

In [49]:
from queue import Queue

In [50]:
help(Queue)

Help on class Queue in module queue:

class Queue(builtins.object)
 |  Queue(maxsize=0)
 |  
 |  Create a queue object with a given maximum size.
 |  
 |  If maxsize is <= 0, the queue size is infinite.
 |  
 |  Methods defined here:
 |  
 |  __init__(self, maxsize=0)
 |      Initialize self.  See help(type(self)) for accurate signature.
 |  
 |  empty(self)
 |      Return True if the queue is empty, False otherwise (not reliable!).
 |      
 |      This method is likely to be removed at some point.  Use qsize() == 0
 |      as a direct substitute, but be aware that either approach risks a race
 |      condition where a queue can grow before the result of empty() or
 |      qsize() can be used.
 |      
 |      To create code that needs to wait for all queued tasks to be
 |      completed, the preferred technique is to use the join() method.
 |  
 |  full(self)
 |      Return True if the queue is full, False otherwise (not reliable!).
 |      
 |      This method is likely to be remove

In [51]:
q = Queue()

In [52]:
q.put("1")

In [53]:
q.put("2")

In [54]:
while True:
    print(q.get())

1
2


KeyboardInterrupt: 

In [56]:
q = Queue()
for ix in range(10):
    q.put(ix)
    
while True:
    v = q.get(timeout=1)
    print(f"{v = }")

v = 0
v = 1
v = 2
v = 3
v = 4
v = 5
v = 6
v = 7
v = 8
v = 9


Empty: 

In [58]:
from queue import Queue, Empty

q = Queue()
for ix in range(10):
    q.put(ix)
    
while True:
    try:
        v = q.get(timeout=1)
    except Empty:
        break
    
    print(f"{v = }")

v = 0
v = 1
v = 2
v = 3
v = 4
v = 5
v = 6
v = 7
v = 8
v = 9


In [9]:
from queue import Queue, Empty
import time

storage = Queue()
squares = []

@thread_info
def add_to_storage(x):
    for i in range(x):
        storage.put(i)
        time.sleep(0.0001)  # Let's simulate I/O operation

@thread_info
def read_from_storage():
    while True:
        try:
            squares.append(storage.get(timeout=1))
        except Empty:
            break

write_thread = Thread(target=add_to_storage, name="Writer-Thread", args=(100,))
read_thread = Thread(target=read_from_storage, name="Reader-Thread")

write_thread.start()
read_thread.start()

write_thread.join()
read_thread.join()

print(f"\n\n{threading.current_thread().name}: {squares = }\n\n", )

Writer-Thread. Start function: add_to_storage. args = (100,), kwargs = {}
Reader-Thread. Start function: read_from_storage. args = (), kwargs = {}
Writer-Thread. End function: add_to_storage
Reader-Thread. End function: read_from_storage


MainThread: squares = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 43]




In [13]:
from queue import Queue, Empty


storage = Queue()
squares = []

@thread_info
def add_to_storage(x):
    for i in range(x):
        storage.put(i)
        time.sleep(1.0001)  # Let's simulate loooooooooong I/O operation

@thread_info
def read_from_storage():
    while True:
        try:
            squares.append(storage.get(timeout=1))
        except Empty:
            break

write_thread = Thread(target=add_to_storage, name="Writer-Thread", args=(5,))
read_thread = Thread(target=read_from_storage, name="Reader-Thread")

write_thread.start()
read_thread.start()

write_thread.join()
read_thread.join()

print(f"\n\n{threading.current_thread().name}: {squares = }\n\n", )

Writer-Thread. Start function: add_to_storage. args = (5,), kwargs = {}
Reader-Thread. Start function: read_from_storage. args = (), kwargs = {}
Writer-Thread. End function: add_to_storage
Reader-Thread. End function: read_from_storage


MainThread: squares = [0, 1, 70, 2, 3, 71, 4]


Writer-Thread. End function: add_to_storage


In [62]:
from threading import Event

help(Event)

Help on class Event in module threading:

class Event(builtins.object)
 |  Class implementing event objects.
 |  
 |  Events manage a flag that can be set to true with the set() method and reset
 |  to false with the clear() method. The wait() method blocks until the flag is
 |  true.  The flag is initially false.
 |  
 |  Methods defined here:
 |  
 |  __init__(self)
 |      Initialize self.  See help(type(self)) for accurate signature.
 |  
 |  clear(self)
 |      Reset the internal flag to false.
 |      
 |      Subsequently, threads calling wait() will block until set() is called to
 |      set the internal flag to true again.
 |  
 |  isSet = is_set(self)
 |  
 |  is_set(self)
 |      Return true if and only if the internal flag is true.
 |  
 |  set(self)
 |      Set the internal flag to true.
 |      
 |      All threads waiting for it to become true are awakened. Threads
 |      that call wait() once the flag is true will not block at all.
 |  
 |  wait(self, timeout=None)
 | 

In [63]:
new_event = Event()

print(new_event.is_set())

new_event.wait(1)

False


False

In [70]:
from queue import Queue, Empty
from threading import Event


storage = Queue(maxsize=5)  #  IRL we do not have infinity memory. So let set some max size
complete_storage = Event()
squares = []

@thread_info
def add_to_storage(x):
    for i in range(x):
        # Without timeout we will wait until we have space to insert to queue
        # Otherwise will raise queue.Full exception 
        time.sleep(0.001)
        storage.put(i)  
    
    complete_storage.set()  # Mark Event `complete_storage` as True
    
@thread_info
def read_from_storage():
    while True:  
        try:
            value = storage.get(timeout=1)
        except Empty:
            if complete_storage.is_set():
                break  # Do while Event `complete_storage` as False
            continue
        
        time.sleep(0.001)  # Simulate some long operations
        squares.append(value)

write_thread = Thread(target=add_to_storage, name="Writer-Thread", args=(100,))
write_thread.start()

read_threads = []  # storage for read threads
for tr_no in range(3):  # Create 3 read threads
    read_thread = Thread(target=read_from_storage, name="Reader-Thread-%02d" % tr_no)
    read_thread.start()
    read_threads.append(read_thread)

write_thread.join()

for worker in read_threads:
    worker.join()

print(f"\n\n{threading.current_thread().name}: {squares = }\n\n", )

Writer-Thread. Start function: add_to_storage. args = (100,), kwargs = {}
Reader-Thread-00. Start function: read_from_storage. args = (), kwargs = {}
Reader-Thread-01. Start function: read_from_storage. args = (), kwargs = {}
Reader-Thread-02. Start function: read_from_storage. args = (), kwargs = {}
Writer-Thread. End function: add_to_storage
Reader-Thread-01. End function: read_from_storage
Reader-Thread-02. End function: read_from_storage
Reader-Thread-00. End function: read_from_storage


MainThread: squares = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 74]




In [14]:
from queue import Queue, Empty
from threading import Event


storage = Queue(maxsize=5)  #  IRL we do not have infinity memory. So let set some max size
complete_storage = Event()
squares = []

@thread_info
def add_to_storage(x):
    for i in range(x):
        # Without timeout we will wait until we have space to insert to queue
        # Otherwise will raise queue.Full exception 
        time.sleep(0.001)
        storage.put(i)  
    
    complete_storage.set()  # Mark Event `complete_storage` as True
    
@thread_info
def read_from_storage():
    while True:  # Do while Event `complete_storage` as False
        try:
            value = storage.get(timeout=1)
        except Empty:
            if complete_storage.is_set():
                break
            continue
        
        time.sleep(0.25)  # Simulate some long operations
        squares.append(value)

write_thread = Thread(target=add_to_storage, name="Writer-Thread", args=(5,))
write_thread.start()

read_threads = []  # storage for read threads
for tr_no in range(3):  # Create 3 read threads
    read_thread = Thread(target=read_from_storage, name="Reader-Thread-%02d" % tr_no)
    read_thread.start()
    read_threads.append(read_thread)

write_thread.join()

print("Block the queue for put")
storage.join()

for worker in read_threads:
    worker.join()

print(f"\n\n{threading.current_thread().name}: {squares = }\n\n", )

Writer-Thread. Start function: add_to_storage. args = (5,), kwargs = {}
Reader-Thread-00. Start function: read_from_storage. args = (), kwargs = {}
Reader-Thread-01. Start function: read_from_storage. args = (), kwargs = {}
Reader-Thread-02. Start function: read_from_storage. args = (), kwargs = {}
Writer-Thread. End function: add_to_storage
Block the queue for put
Reader-Thread-02. End function: read_from_storage
Reader-Thread-00. End function: read_from_storage
Reader-Thread-01. End function: read_from_storage


KeyboardInterrupt: 

In [73]:
from queue import Queue, Empty
from threading import Event


storage = Queue(maxsize=5)  #  IRL we do not have infinity memory. So let set some max size
complete_storage = Event()
squares = []

@thread_info
def add_to_storage(x):
    for i in range(x):
        # Without timeout we will wait until we have space to insert to queue
        # Otherwise will raise queue.Full exception 
        time.sleep(0.001)
        storage.put(i)  
    
    complete_storage.set()  # Mark Event `complete_storage` as True
    
@thread_info
def read_from_storage():
    while True:  # Do while Event `complete_storage` as False
        try:
            value = storage.get(timeout=1)
        except Empty:
            if complete_storage.is_set():
                break
            continue
        
        try:
            time.sleep(0.25)  # Simulate some long operations
            squares.append(value)
        finally:
            storage.task_done()  # Set task as done

write_thread = Thread(target=add_to_storage, name="Writer-Thread", args=(5,))
write_thread.start()

read_threads = []  # storage for read threads
for tr_no in range(3):  # Create 3 read threads
    read_thread = Thread(target=read_from_storage, name="Reader-Thread-%02d" % tr_no)
    read_thread.start()
    read_threads.append(read_thread)

write_thread.join()

print("Block the queue for put")
storage.join()
print("Release queue")

for worker in read_threads:
    worker.join()

print(f"\n\n{threading.current_thread().name}: {squares = }\n\n", )

Writer-Thread. Start function: add_to_storage. args = (5,), kwargs = {}
Reader-Thread-00. Start function: read_from_storage. args = (), kwargs = {}
Reader-Thread-01. Start function: read_from_storage. args = (), kwargs = {}
Reader-Thread-02. Start function: read_from_storage. args = (), kwargs = {}
Writer-Thread. End function: add_to_storage
Block the queue for put
Release queue
Reader-Thread-02. End function: read_from_storage
Reader-Thread-00. End function: read_from_storage
Reader-Thread-01. End function: read_from_storage


MainThread: squares = [0, 1, 2, 3, 4]




In [74]:
from queue import Queue, Empty
from threading import Event


storage = Queue(maxsize=5)  #  IRL we do not have infinity memory. So let set some max size
squares = []


class StopObject:
    # Let's create some object which notify that queue is full. Work well in FIFO Queues 
    def __init__(self, name="undefined"):
        self.name = name


@thread_info
def add_to_storage(x):
    for i in range(x):
        # Without timeout we will wait until we have space to insert to queue
        # Otherwise will raise queue.Full exception 
        time.sleep(0.001)
        storage.put(i)  
    
    complete_storage.set()  # Mark Event `complete_storage` as True
    
@thread_info
def read_from_storage():
    while True:
        value = storage.get()
        
        try:
            if isinstance(value, StopObject):
                print(f"{threading.current_thread().name} got StopEvent {value.name}")
                break

            time.sleep(0.25)  # Simulate some long operations
            squares.append(value)
        finally:
            storage.task_done()  # Set task as done
        
        
write_thread = Thread(target=add_to_storage, name="Writer-Thread", args=(5,))
write_thread.start()

read_threads = []  # storage for read threads
for tr_no in range(3):  # Create 3 read threads
    read_thread = Thread(target=read_from_storage, name="Reader-Thread-%02d" % tr_no)
    read_thread.start()
    read_threads.append(read_thread)

write_thread.join()
print("Block the queue for put")
storage.join()

for ix, _ in enumerate(read_threads):
    storage.put(StopObject(name="Stop Event No: %02d" % ix))

print("Block the queue for put")
storage.join()

for worker in read_threads:
    worker.join()

print(f"\n\n{threading.current_thread().name}: {squares = }\n\n", )

Writer-Thread. Start function: add_to_storage. args = (5,), kwargs = {}
Reader-Thread-00. Start function: read_from_storage. args = (), kwargs = {}
Reader-Thread-01. Start function: read_from_storage. args = (), kwargs = {}
Reader-Thread-02. Start function: read_from_storage. args = (), kwargs = {}
Writer-Thread. End function: add_to_storage
Block the queue for put
Block the queue for putReader-Thread-02 got StopEvent Stop Event No: 00Reader-Thread-00 got StopEvent Stop Event No: 01
Reader-Thread-00. End function: read_from_storage


Reader-Thread-02. End function: read_from_storage
Reader-Thread-01 got StopEvent Stop Event No: 02
Reader-Thread-01. End function: read_from_storage


MainThread: squares = [0, 1, 2, 3, 4]




## Locks

In [82]:
import threading


class DummyCounter():
    def __init__(self):
        self.count = 0
        
    def increment(self):
        self.count += 1

        
@thread_info        
def worker(items, counter):
    for _ in range(items):
        counter.increment()
        

dc = DummyCounter()
count_per_thread = 1000000
total_workers = 2

threads = []
for tr_no in range(total_workers):  # Create thread workers
    tr = threading.Thread(
        target=worker, 
        name="Counter-Worker-%02d" % tr_no, 
        args=(count_per_thread, dc)
    )
    tr.start()
    threads.append(tr)

    
for tr in threads:
    tr.join()

print(f"\n\n\nExpected counter {total_workers * count_per_thread}, Actual {dc.count}")

Counter-Worker-00. Start function: worker. args = (1000000, <__main__.DummyCounter object at 0x7fc2ee8d4880>), kwargs = {}
Counter-Worker-01. Start function: worker. args = (1000000, <__main__.DummyCounter object at 0x7fc2ee8d4880>), kwargs = {}
Counter-Worker-00. End function: worker
Counter-Worker-01. End function: worker



Expected counter 2000000, Actual 1725837


In the fact this code is not atomic operation

```python
    def increment(self):
        self.count += 1
```

it call 3 operations


```python

    def increment(self):
        value = getattr(self, "count")
        value += 1
        setattr(self, "count", value)
        
```

GIL can switch to other thread after complete one of operation

In [84]:
from threading import Lock, RLock
import threading


class DummyCounter():
    # Thread-safe counter 
    def __init__(self):
        self.count = 0
        self.lock = Lock()
        
    def increment(self,):
        with self.lock:  # lock GIL switching to another thread
            self.count += 1
                
        """      
        self.lock.acquire()
        try:
            count += 1    
        finally:
            self.lock.release()
        """
        
@thread_info        
def worker(items, counter):
    for _ in range(items):
        counter.increment()


dc = DummyCounter()
count_per_thread = 1000000
total_workers = 2
threads = []


for tr_no in range(total_workers):  # Create thread workers
    tr = threading.Thread(
        target=worker, 
        name="Counter-Worker-%02d" % tr_no, 
        args=(count_per_thread, dc)
    )
    tr.start()
    threads.append(tr)

    
for tr in threads:
    tr.join()

print(f"\n\n\nExpected counter {total_workers * count_per_thread}, Actual {dc.count}")

Counter-Worker-00. Start function: worker. args = (1000000, <__main__.DummyCounter object at 0x7fc2ee8bcdc0>), kwargs = {}
Counter-Worker-01. Start function: worker. args = (1000000, <__main__.DummyCounter object at 0x7fc2ee8bcdc0>), kwargs = {}
Counter-Worker-00. End function: worker
Counter-Worker-01. End function: worker



Expected counter 2000000, Actual 2000000


__Lock__: `.acquire()` can call only once before `.release()`

__RLock__: `.acquire()` can call multiple times in the same Thread before `.release()`


In [85]:
lock = Lock()

print("No Lock")
lock.acquire()
print("Lock first time")
lock.acquire()
print("And now we are deadlocked")

No Lock
Lock first time


KeyboardInterrupt: 

In [86]:
lock = RLock()

print("No Lock")
lock.acquire()
print("Lock first time")
lock.acquire()
print("Everything is OK")

No Lock
Lock first time
Everything is OK
