# Thread Synchronization Primitives

One of the peculiarities when writing multithreaded programs is the race condition. This condition occurs when two or more threads are accessing shared resources.

For example, we have a database, one of the cells of which contains a number, say, 0. By launching two threads simultaneously through the ThreadPoolExecutor, we run a function that reads the number, increases it by 1 and writes it to the database.``

In [1]:
import time
class DataBase:
    def __init__(self):
        self.value = 0
    def update(self, name):
        print(name, " - start thread")
        local_copy = self.value
        local_copy += 1
        time.sleep(0.1)
        self.value = local_copy
        print(name, " - finish thread")

We expect the first thread to write 1 and the second thread to increment 1 to 2 afterwards. the access occurs almost simultaneously (remember the GIL), as a result, it will remain 1. Since both threads consider 0 as initial data.

To successfully resolve the race condition, Python has added synchronization primitives that many may be familiar with from other programming languages.

Lock is a lock that can only be held by one thread at a time.

If another thread wants to access a lock that another thread has held, then it will be forced to wait until the borrower releases the lock.

In [None]:
import threading
lock = threading.Lock()
lock.acquire() # Will lock this piece of code
# some code
lock.release() # release the lock

Conveniently, you can use the context manager and not worry about having to do an explicit lock/unlock.

In [None]:
with self._lock:
    local_copy = self.value
    local_copy += 1
    time.sleep(0.1)
    self.value = local_copy

A semaphore is somewhat similar to Lock, with the only difference that it has a built-in counter that blocks access if the number of threads holding the semaphore is exceeded.

In [2]:
import threading
max_connections = 10
semaphore = threading.BoundedSemaphore(max_connections)
semaphore.acquire() # decrement counter (-1)
# ... access to shared resources
semaphore.release() # increment counter (+1)

With each acquire(), the counter decreases, with release() it increases, but when the counter is 0, the new thread will have to wait until there is room for it.

Also, during the development of a program with multithreading, it is convenient to use the Queue module, which implements a queue mechanism with threadsafe support. This means that, using a queue, information can be safely exchanged between threads.

In [3]:
import threading
import time
from queue import Queue
from threading import Thread
num_worker_threads=2
def do_work(item):
     time.sleep(1)
     print("my task is", item, "i am ", threading.current_thread())

def worker():
     while True:
         item = q.get() # get the job from
         print("get task - ", item)
         do_work(item) # do work
         q.task_done() # signal completion
q = Queue()
for i in range(num_worker_threads): # Create and run threads
     t = Thread(target=worker)
     t.setDaemon(True)
     t.start()
for item in range(0, 5): # queue jobs
     q.put(item)
q.join() # Wait until all jobs are done

get task - get task -  1
 0
my task ismy task is 0 i am  <Thread(Thread-3, started daemon 22924)>
get task -  2
 1 i am  <Thread(Thread-4, started daemon 30432)>
get task -  3
my task ismy task is 3 i am  <Thread(Thread-4, started daemon 30432)>
get task -  4
 2 i am  <Thread(Thread-3, started daemon 22924)>
my task is 4 i am  <Thread(Thread-4, started daemon 30432)>


In this example, we start several threads, create a queue and put jobs into it. Threads, using the safe q.get() method, receive tasks and execute them.

Using this example, you can easily implement your own script for downloading sites from available urls or an application that, after calculating some data, will give the task of adding the results to the database.