# Threading

### Argument to thread

In [7]:
# threading_simple.py
import threading


def worker(num):
    """thread worker function"""
    print('Worker: %s' % num)


threads = []
for i in range(5):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

Worker: 0
Worker: 1
Worker: 2
Worker: 3
Worker: 4


### Determining the Current Thread

Using arguments to identify or name the thread is cumbersome and unnecessary. Each Thread instance has a name with a default value that can be changed as the thread is created. Naming threads is useful in server processes with multiple service threads handling different operations.

In [8]:
# threading_names.py
import threading
import time


def worker():
    print(threading.current_thread().getName(), 'Starting')
    time.sleep(0.2)
    print(threading.current_thread().getName(), 'Exiting')


def my_service():
    print(threading.current_thread().getName(), 'Starting')
    time.sleep(0.3)
    print(threading.current_thread().getName(), 'Exiting')


t = threading.Thread(name='my_service', target=my_service)
w = threading.Thread(name='worker', target=worker)
w2 = threading.Thread(target=worker)  # use default name

w.start()
w2.start()
t.start()

worker Starting
Thread-17 Starting
my_service Starting
worker Exiting
Thread-17 Exiting
my_service Exiting


### Logging threads

In [9]:
# threading_names_log.py
import logging
import threading
import time


def worker():
    logging.debug('Starting')
    time.sleep(0.2)
    logging.debug('Exiting')

    
def my_service():
    logging.debug('Starting')
    time.sleep(0.3)
    logging.debug('Exiting')

    
logging.basicConfig(
    level=logging.DEBUG,
    format='[%(levelname)s] (%(threadName)-10s) %(message)s',
)

t = threading.Thread(name='my_service', target=my_service)
w = threading.Thread(name='worker', target=worker)
w2 = threading.Thread(target=worker)  # use default name

w.start()
w2.start()
t.start()

[DEBUG] (worker    ) Starting
[DEBUG] (Thread-18 ) Starting
[DEBUG] (my_service) Starting
[DEBUG] (worker    ) Exiting
[DEBUG] (Thread-18 ) Exiting
[DEBUG] (my_service) Exiting


### Daemon vs. Non-Daemon Threads

Up to this point, the example programs have implicitly waited to exit until all threads have completed their work. Sometimes programs spawn a thread as a daemon that runs without blocking the main program from exiting. Using daemon threads is useful for services where there may not be an easy way to interrupt the thread, or where letting the thread die in the middle of its work does not lose or corrupt data (for example, a thread that generates “heart beats” for a service monitoring tool). To mark a thread as a daemon, pass daemon=True when constructing it or call its set_daemon() method with True. The default is for threads to not be daemons.

In [10]:
# threading_daemon.py
import threading
import time
import logging


def daemon():
    logging.debug('Starting')
    time.sleep(2)
    logging.debug('Exiting')


def non_daemon():
    logging.debug('Starting')
    logging.debug('Exiting')


logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

d = threading.Thread(name='daemon', target=daemon, daemon=True)

t = threading.Thread(name='non-daemon', target=non_daemon)

d.start()
t.start()

[DEBUG] (daemon    ) Starting
[DEBUG] (non-daemon) Starting
[DEBUG] (non-daemon) Exiting
[DEBUG] (daemon    ) Exiting


To wait until a daemon thread has completed its work, use the join() method.



In [11]:
# threading_daemon_join.py
import threading
import time
import logging


def daemon():
    logging.debug('Starting')
    time.sleep(0.2)
    logging.debug('Exiting')


def non_daemon():
    logging.debug('Starting')
    logging.debug('Exiting')


logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

d = threading.Thread(name='daemon', target=daemon, daemon=True)

t = threading.Thread(name='non-daemon', target=non_daemon)

d.start()
t.start()

d.join()
t.join()

[DEBUG] (daemon    ) Starting
[DEBUG] (non-daemon) Starting
[DEBUG] (non-daemon) Exiting
[DEBUG] (daemon    ) Exiting


By default, join() blocks indefinitely. It is also possible to pass a float value representing the number of seconds to wait for the thread to become inactive. If the thread does not complete within the timeout period, join() returns anyway.

In [13]:
# threading_daemon_join_timeout.py
import threading
import time
import logging


def daemon():
    logging.debug('Starting')
    time.sleep(0.2)
    logging.debug('Exiting')


def non_daemon():
    logging.debug('Starting')
    logging.debug('Exiting')


logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

d = threading.Thread(name='daemon', target=daemon, daemon=True)

t = threading.Thread(name='non-daemon', target=non_daemon)

d.start()
t.start()

d.join(0.1)
print('d.isAlive()', d.isAlive())
t.join()

[DEBUG] (daemon    ) Starting
[DEBUG] (non-daemon) Starting
[DEBUG] (non-daemon) Exiting


d.isAlive() True


[DEBUG] (daemon    ) Exiting


## Enumerating All Threads¶

It is not necessary to retain an explicit handle to all of the daemon threads in order to ensure they have completed before exiting the main process. enumerate() returns a list of active Thread instances. The list includes the current thread, and since joining the current thread introduces a deadlock situation, it must be skipped.

In [25]:
# threading_enumerate.py
import random
import threading
import time
import logging


def worker():
    """thread worker function"""
    pause = random.randint(1, 5) / 10
    logging.debug('sleeping %0.2f', pause)
    time.sleep(pause)
    logging.debug('ending')


logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

for i in range(3):
    t = threading.Thread(target=worker, daemon=True)
    t.start()

main_thread = threading.main_thread()
for t in threading.enumerate():
    if t is main_thread:
        continue
    logging.debug('joining %s', t.getName())
    t.join()

[DEBUG] (Thread-37 ) sleeping 0.50
[DEBUG] (Thread-38 ) sleeping 0.40
[DEBUG] (Thread-39 ) sleeping 0.20
[DEBUG] (MainThread) joining Thread-2
[DEBUG] (Thread-39 ) ending
[DEBUG] (Thread-38 ) ending
[DEBUG] (Thread-37 ) ending


KeyboardInterrupt: 

## Subclassing Thread

At start-up, a Thread does some basic initialization and then calls its run() method, which calls the target function passed to the constructor. To create a subclass of Thread, override run() to do whatever is necessary.

In [18]:
# threading_subclass.py
import threading
import logging


class MyThread(threading.Thread):

    def run(self):
        logging.debug('running')


logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

for i in range(5):
    t = MyThread()
    t.start()

[DEBUG] (Thread-26 ) running
[DEBUG] (Thread-27 ) running
[DEBUG] (Thread-28 ) running
[DEBUG] (Thread-29 ) running
[DEBUG] (Thread-30 ) running


## Timer Threads

One example of a reason to subclass Thread is provided by Timer, also included in threading. A Timer starts its work after a delay, and can be canceled at any point within that delay time period.

In [19]:
# threading_timer.py
import threading
import time
import logging


def delayed():
    logging.debug('worker running')


logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

t1 = threading.Timer(0.3, delayed)
t1.setName('t1')
t2 = threading.Timer(0.3, delayed)
t2.setName('t2')

logging.debug('starting timers')
t1.start()
t2.start()

logging.debug('waiting before canceling %s', t2.getName())
time.sleep(0.2)
logging.debug('canceling %s', t2.getName())
t2.cancel()
logging.debug('done')

[DEBUG] (MainThread) starting timers
[DEBUG] (MainThread) waiting before canceling t2
[DEBUG] (MainThread) canceling t2
[DEBUG] (MainThread) done
[DEBUG] (t1        ) worker running


## Signaling Between Threads

Although the point of using multiple threads is to run separate operations concurrently, there are times when it is important to be able to synchronize the operations in two or more threads. Event objects are a simple way to communicate between threads safely. An Event manages an internal flag that callers can control with the set() and clear() methods. Other threads can use wait() to pause until the flag is set, effectively blocking progress until allowed to continue.

In [26]:
# threading_event.py
import logging
import threading
import time


def wait_for_event(e):
    """Wait for the event to be set before doing anything"""
    logging.debug('wait_for_event starting')
    event_is_set = e.wait()
    logging.debug('event set: %s', event_is_set)


def wait_for_event_timeout(e, t):
    """Wait t seconds and then timeout"""
    while not e.is_set():
        logging.debug('wait_for_event_timeout starting')
        event_is_set = e.wait(t)
        logging.debug('event set: %s', event_is_set)
        if event_is_set:
            logging.debug('processing event')
        else:
            logging.debug('doing other work')


logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

e = threading.Event()
t1 = threading.Thread(
    name='block',
    target=wait_for_event,
    args=(e,),
)
t1.start()

t2 = threading.Thread(
    name='nonblock',
    target=wait_for_event_timeout,
    args=(e, 2),
)
t2.start()

logging.debug('Waiting before calling Event.set()')
time.sleep(5)
e.set()
logging.debug('Event is set')


[DEBUG] (block     ) wait_for_event starting
[DEBUG] (nonblock  ) wait_for_event_timeout starting
[DEBUG] (MainThread) Waiting before calling Event.set()
[DEBUG] (nonblock  ) event set: False
[DEBUG] (nonblock  ) doing other work
[DEBUG] (nonblock  ) wait_for_event_timeout starting
[DEBUG] (nonblock  ) event set: False
[DEBUG] (nonblock  ) doing other work
[DEBUG] (nonblock  ) wait_for_event_timeout starting
[DEBUG] (MainThread) Event is set
[DEBUG] (nonblock  ) event set: True
[DEBUG] (block     ) event set: True
[DEBUG] (nonblock  ) processing event


## Controlling Access to Resources

In addition to synchronizing the operations of threads, it is also important to be able to control access to shared resources to prevent corruption or missed data. Python’s built-in data structures (lists, dictionaries, etc.) are thread-safe as a side-effect of having atomic byte-codes for manipulating them (the global interpreter lock used to protect Python’s internal data structures is not released in the middle of an update). Other data structures implemented in Python, or simpler types like integers and floats, do not have that protection. To guard against simultaneous access to an object, use a Lock object.

In [None]:
# threading_lock.py
import logging
import random
import threading
import time


class Counter:

    def __init__(self, start=0):
        self.lock = threading.Lock()
        self.value = start

    def increment(self, worker_number):
        logging.debug('Waiting for lock')
        self.lock.acquire()
        try:
            logging.debug('Acquired lock')
            self.value = self.value + 1
            print(self.value, f'by {worker_number}')
        finally:
            self.lock.release()


def worker(c, worker_number):
    for i in range(2):
        pause = random.random()
        logging.debug('Sleeping %0.02f', pause)
        time.sleep(pause)
        c.increment(worker_number)
    logging.debug('Done')


logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

counter = Counter()
for i in range(2):
    t = threading.Thread(target=worker, args=(counter, i))
    t.start()

logging.debug('Waiting for worker threads')
main_thread = threading.main_thread()
for t in threading.enumerate():
    if t is not main_thread:
        t.join()
logging.debug('Counter: %d', counter.value)

[DEBUG] (Thread-42 ) Sleeping 0.73
[DEBUG] (Thread-43 ) Sleeping 0.70
[DEBUG] (MainThread) Waiting for worker threads
[DEBUG] (Thread-43 ) Waiting for lock
[DEBUG] (Thread-43 ) Acquired lock
[DEBUG] (Thread-43 ) Sleeping 0.46
[DEBUG] (Thread-42 ) Waiting for lock
[DEBUG] (Thread-42 ) Acquired lock
[DEBUG] (Thread-42 ) Sleeping 0.97


1 by 1
2 by 0


[DEBUG] (Thread-43 ) Waiting for lock
[DEBUG] (Thread-43 ) Acquired lock
[DEBUG] (Thread-43 ) Done


3 by 1


[DEBUG] (Thread-42 ) Waiting for lock
[DEBUG] (Thread-42 ) Acquired lock
[DEBUG] (Thread-42 ) Done


4 by 0


## Locks as Context Managers

Locks implement the context manager API and are compatible with the with statement. Using with removes the need to explicitly acquire and release the lock.

In [23]:
# threading_lock_with.py
import threading
import logging


def worker_with(lock):
    with lock:
        logging.debug('Lock acquired via with')


def worker_no_with(lock):
    lock.acquire()
    try:
        logging.debug('Lock acquired directly')
    finally:
        lock.release()


logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

lock = threading.Lock()
w = threading.Thread(target=worker_with, args=(lock,))
nw = threading.Thread(target=worker_no_with, args=(lock,))

w.start()
nw.start()

[DEBUG] (Thread-35 ) Lock acquired via with
[DEBUG] (Thread-36 ) Lock acquired directly


## Limiting Concurrent Access to Resources

Sometimes it is useful to allow more than one worker access to a resource at a time, while still limiting the overall number. For example, a connection pool might support a fixed number of simultaneous connections, or a network application might support a fixed number of concurrent downloads. A Semaphore is one way to manage those connections.

In [24]:
# threading_semaphore.py
import logging
import random
import threading
import time


class ActivePool:

    def __init__(self):
        super(ActivePool, self).__init__()
        self.active = []
        self.lock = threading.Lock()

    def makeActive(self, name):
        with self.lock:
            self.active.append(name)
            logging.debug('Running: %s', self.active)

    def makeInactive(self, name):
        with self.lock:
            self.active.remove(name)
            logging.debug('Running: %s', self.active)


def worker(s, pool):
    logging.debug('Waiting to join the pool')
    with s:
        name = threading.current_thread().getName()
        pool.makeActive(name)
        time.sleep(0.1)
        pool.makeInactive(name)


logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s (%(threadName)-2s) %(message)s',
)

pool = ActivePool()
s = threading.Semaphore(2)
for i in range(4):
    t = threading.Thread(
        target=worker,
        name=str(i),
        args=(s, pool),
    )
    t.start()

[DEBUG] (0         ) Waiting to join the pool
[DEBUG] (1         ) Waiting to join the pool
[DEBUG] (2         ) Waiting to join the pool
[DEBUG] (3         ) Waiting to join the pool
[DEBUG] (0         ) Running: ['0']
[DEBUG] (1         ) Running: ['0', '1']
[DEBUG] (0         ) Running: ['1']
[DEBUG] (1         ) Running: []
[DEBUG] (3         ) Running: ['3']
[DEBUG] (2         ) Running: ['3', '2']
[DEBUG] (3         ) Running: ['2']
[DEBUG] (2         ) Running: []
