In [1]:
import threading
threading.active_count()

5

In [16]:
# Return a list of all Thread objects currently alive.
threading.enumerate()

[<_MainThread(MainThread, started 139658824619840)>,
 <Thread(Thread-2, started daemon 139658491819776)>,
 <Heartbeat(Thread-3, started daemon 139658483427072)>,
 <HistorySavingThread(IPythonHistorySavingThread, started 139658458248960)>,
 <ParentPollerUnix(Thread-1, started daemon 139658109384448)>]

In [3]:
# Return the current use thread
threading.current_thread()

<_MainThread(MainThread, started 139658824619840)>

In [4]:
threading.get_ident()

139658824619840

In [5]:
mydate = threading.local()

In [6]:
mydate.x = 1

In [19]:
import logging
import time
def thread_function(name):
    logging.info("Thread %s: starting", name)
    time.sleep(2)
    # Thread finished after the Main section of your code did.
    logging.info("Thread %s: ending", name)
def main():
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")
    logging.info("Main : before creating thread")
    # the arguments pass to the threafing target function shoulde be tuple.
    x = threading.Thread(target=thread_function, args=("thread 1 name", ), daemon=True)
    logging.info("Main : before running thread")
    x.start()
    logging.info("Main : wait for the thread to finish")
    logging.info("Main : all done")

In [20]:
main()

17:45:59: Main : before creating thread
17:45:59: Main : before running thread
17:45:59: Thread thread 1 name: starting
17:45:59: Main : wait for the thread to finish
17:45:59: Main : all done
17:46:01: Thread thread 1 name: ending


It is strongly recommended that you use ThreadPoolExecutor as a context manager when you can so that you never forget to .join() the threads.


> Note: Using a ThreadPoolExecutor can cause some confusing errors.
 For example, if you call a function that takes no parameters, but you pass it parameters in .map(), the thread will throw an exception.
 Unfortunately, ThreadPoolExecutor will hide that exception, and (in the case above) the program terminates with no output. This can be quite confusing to debug at first.


In [21]:
import concurrent.futures

# [rest of code]

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        executor.map(thread_function, range(3))

18:30:40: Thread 0: starting
18:30:40: Thread 1: starting
18:30:40: Thread 2: starting
18:30:42: Thread 0: ending
18:30:42: Thread 1: ending
18:30:42: Thread 2: ending


Once you've seen what a race condition is and looked at one happening you'll move on to some of the primitives provided by the standard library to prevent race conditions from happening.

Race connditions can occur when two or more threads access a shared piece of dara or resource.

In [43]:
import threading
class FakeDarabase:
    def __init__(self):
        self.value = 0
        self._lock  = threading.Lock()
    def update(self, name):
        logging.info("Thread %s: starting update", name)
        with self._lock:
        #self._lock .acquire()
            print("Thread %s: has a lock", name)
            local_copy = self.value
            local_copy += 1
            # Between this thread sleelping time, the other thread will execute.
            # and that local_copy will still be 1.
            time.sleep(name)
            self.value = local_copy
            print("Thread %s: release this lock.", name)
        #self._lock .release()
        logging.info("Thread %s: finishing update", name)

In [44]:
from concurrent.futures import ThreadPoolExecutor

database = FakeDarabase()
print(database.value)
with ThreadPoolExecutor(max_workers=2) as executor:
    for i in range(1, 3):
        executor.submit(database.update, i)
print(database.value)

09:59:57: Thread 1: starting update
09:59:57: Thread 2: starting update


0
Thread %s: has a lock 1


09:59:58: Thread 1: finishing update


Thread %s: release this lock. 1
Thread %s: has a lock 2


10:00:00: Thread 2: finishing update


Thread %s: release this lock. 2
2


In [33]:
import dis
# dis is a standard library to show the smaller steps
# that the processor does to implement your function.
def inc(x):
    y = x * 2 + 1
    return y
dis.dis(inc)

  3           0 LOAD_FAST                0 (x)
              2 LOAD_CONST               1 (2)
              4 BINARY_MULTIPLY
              6 LOAD_CONST               2 (1)
              8 BINARY_ADD
             10 STORE_FAST               1 (y)

  4          12 LOAD_FAST                1 (y)
             14 RETURN_VALUE


# Producer-Consumer Threading
Used to look at threading or process synchronization issues.

In [63]:
import random
SENTINEL = object()

def producer(pipline):
    for index in range(10):
        # Pretenf we're get 10 message from the network.
        message = random.randint(1, 101)
        logging.info("Producer got message: %s", message)
        pipline.set_message(message, "Producer")
    # uses a sentinel value to signal the consumer to stop after it has
    # sent ten values
    pipline.set_message(SENTINEL, "Producer")

In [64]:
def consumer(pipline):
    message = 0
    while message is not SENTINEL:
        message = pipline.get_message("Consumer")
        if message is not SENTINEL:
            logging.info("Consumer storing message: %s", message)

In [65]:
class Pipeline:
    def __init__(self):
        self.message = 0
        self.producer_lock = threading.Lock()
        self.consumer_lock = threading.Lock()
        self.consumer_lock.acquire()
    
    def get_message(self, name):
        logging.info("%s: about to acquire getlock", name)
        self.consumer_lock.acquire()
        logging.info("%s: have getlock", name)
        message = self.message
        logging.info("%s: about to release setlock", name)
        self.producer_lock.release()
        logging.info("%s: setlock released", name)
        return message
    
    def set_message(self, message, name):
        logging.info("%s: about to acquire setlock", name)
        self.producer_lock.acquire()
        logging.info("%s: have setlock", name)
        self.message = message
        logging.info("%s: about to release getlock", name)
        self.consumer_lock.release()
        logging.info("%s: getlock released", name)

In [67]:
pipline = Pipeline()
with ThreadPoolExecutor(max_workers=2) as executor:
    executor.submit(consumer, pipline)
    executor.submit(producer, pipline)

10:48:26: Consumer: about to acquire getlock
10:48:26: Producer got message: 60
10:48:26: Producer: about to acquire setlock
10:48:26: Producer: have setlock
10:48:26: Producer: about to release getlock
10:48:26: Producer: getlock released
10:48:26: Consumer: have getlock
10:48:26: Producer got message: 97
10:48:26: Consumer: about to release setlock
10:48:26: Producer: about to acquire setlock
10:48:26: Consumer: setlock released
10:48:26: Producer: have setlock
10:48:26: Consumer storing message: 60
10:48:26: Producer: about to release getlock
10:48:26: Consumer: about to acquire getlock
10:48:26: Producer: getlock released
10:48:26: Consumer: have getlock
10:48:26: Producer got message: 93
10:48:26: Consumer: about to release setlock
10:48:26: Producer: about to acquire setlock
10:48:26: Consumer: setlock released
10:48:26: Producer: have setlock
10:48:26: Consumer storing message: 97
10:48:26: Producer: about to release getlock
10:48:26: Consumer: about to acquire getlock
10:48:26:

In [62]:
1 + 1

2

The `threading.Event` object allows one thread to signal an `event` while many other threads can be waiting for that `event` to happen.

In [68]:
def producer(pipline, event):
    while not event.is_set():
        message = random.randint(1, 101)
        logging.info("Producer got message: %s", message)
        pipeline.set_message(message, "Producer")

    logging.info("Producer received EXIT event. Exiting")

In [69]:
def consumer(pipeline, event):
    """Pretend we're saving a number in the database."""
    while not event.is_set() or not pipeline.empty():
        message = pipeline.get_message("Consumer")
        logging.info(
            "Consumer storing message: %s  (queue size=%s)",
            message,
            pipeline.qsize(),
        )

    logging.info("Consumer received EXIT event. Exiting")

In [70]:
import queue

class Pipeline(queue.Queue):
    def __init__(self):
        super().__init__(maxsize=10)

    def get_message(self, name):
        logging.debug("%s:about to get from queue", name)
        value = self.get()
        logging.debug("%s:got %d from queue", name, value)
        return value

    def set_message(self, value, name):
        logging.debug("%s:about to add %d to queue", name, value)
        self.put(value)
        logging.debug("%s:added %d to queue", name, value)

In [72]:
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.DEBUG,
                    datefmt="%H:%M:%S")
# logging.getLogger().setLevel(logging.DEBUG)

pipeline = Pipeline()
event = threading.Event()
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    executor.submit(producer, pipeline, event)
    executor.submit(consumer, pipeline, event)

    time.sleep(0.1)
    logging.info("Main: about to set event")
    event.set()

11:09:27: Producer got message: 67
11:09:27: Producer got message: 62
11:09:27: Consumer storing message: 67  (queue size=0)
11:09:27: Producer got message: 54
11:09:27: Producer got message: 40
11:09:27: Producer got message: 40
11:09:27: Producer got message: 84
11:09:27: Producer got message: 100
11:09:27: Producer got message: 37
11:09:27: Producer got message: 58
11:09:27: Producer got message: 16
11:09:27: Producer got message: 36
11:09:27: Consumer storing message: 62  (queue size=0)
11:09:27: Producer got message: 58
11:09:27: Consumer storing message: 54  (queue size=8)
11:09:27: Producer got message: 8
11:09:27: Consumer storing message: 40  (queue size=8)
11:09:27: Producer got message: 13
11:09:27: Consumer storing message: 40  (queue size=8)
11:09:27: Producer got message: 65
11:09:27: Consumer storing message: 84  (queue size=8)
11:09:27: Producer got message: 75
11:09:27: Consumer storing message: 100  (queue size=8)
11:09:27: Producer got message: 58
11:09:27: Consumer 

In [102]:
class Foo:
    def __init__(self):
        self._condition = threading.Condition()
        self.first_called = False
        self.second_called = False
    
    def first(self, printFrist):
        with self._condition:
            printFrist()
            self.first_called = True
            self._condition.notify_all()
    
    def second(self, printSecond):
        with self._condition:
            while not self.first_called:
                self._condition.wait()
            printSecond()
            self.second_called = True
            self._condition.notify_all()
            
    
    def third(self, printThird):
        with self._condition:
            while not self.second_called:
                self._condition.wait()
            printThird()

In [103]:
foo = Foo()
printFirst = lambda: print('first')
printSecond = lambda: print('second')
printThird = lambda: print('third')
t1 = threading.Thread(target=foo.first, args=(printFirst,))
t2 = threading.Thread(target=foo.second, args=(printSecond,))
t3 = threading.Thread(target=foo.third, args=(printThird,))
t3.start()
t2.start()
t1.start()
t3.join()
t2.join()
t1.join()

first
second
third


In [116]:
from threading import Event
class FooBar:
    def __init__(self, n):
        self.n = n
        self.event = (Event(), Event())
        self.event[1].set()
        
    def foo(self, printFoo: 'Callable[[], None]') -> None:
        
        for i in range(self.n):
            self.event[1].wait()
            # printFoo() outputs "foo". Do not change or remove this line.
            printFoo()
            self.event[0].set()
            self.event[1].clear()


    def bar(self, printBar: 'Callable[[], None]') -> None:
        
        for i in range(self.n):
            self.event[0].wait()
            # printBar() outputs "bar". Do not change or remove this line.
            printBar()
            self.event[1].set()
            self.event[0].clear()

In [117]:
foobar = FooBar(10)
printFoo = lambda: print('foo')
printBar = lambda: print('bar')
t1 = threading.Thread(target=foobar.foo, args=(printFoo,))
t2 = threading.Thread(target=foobar.bar, args=(printBar,))
t2.start()
t1.start()
t2.join()
t1.join()

foo
bar
foo
bar
foo
bar
foo
bar
foo
bar
foo
bar
foo
bar
foo
bar
foo
bar
foo
bar


# Print Zero Even Odd

Suppose you are given the following code:
```
class ZeroEvenOdd {
  public ZeroEvenOdd(int n) { ... }      // constructor
  public void zero(printNumber) { ... }  // only output 0's
  public void even(printNumber) { ... }  // only output even numbers
  public void odd(printNumber) { ... }   // only output odd numbers
}
```
The same instance of ZeroEvenOdd will be passed to three different threads:

Thread A will call zero() which should only output 0's.
Thread B will call even() which should only ouput even numbers.
Thread C will call odd() which should only output odd numbers.
Each of the thread is given a printNumber method to output an integer. Modify the given program to output the series 010203040506... where the length of the series must be 2n.

**Example 1:**
```
Input: n = 2
Output: "0102"
Explanation: There are three threads being fired asynchronously. One of them calls zero(), the other calls even(), and the last one calls odd(). "0102" is the correct output.
```
**Example 2:**
```
Input: n = 5
Output: "0102030405"
```

In [118]:
class ZeroEvenOdd:
    def __init__(self, n):
        self._condition = threading.Condition()
        self.n = n
        self.order = 0
        
    def zero(self, printNumber: 'Callable[[int], None]') -> None:
        for i in range(self.n):
            with self._condition:
                self._condition.wait_for(lambda: self.order == 0)
                printNumber(0)
                self.order = 1 if i % 2 == 0 else 2
                self._condition.notify_all()
        
    def even(self, printNumber: 'Callable[[int], None]') -> None:
        for i in range(2, self.n+1, 2):
            with self._condition:
                self._condition.wait_for(lambda: self.order == 2)
                printNumber(i)
                self.order = 0
                self._condition.notify_all()
        
    def odd(self, printNumber: 'Callable[[int], None]') -> None:
        for i in range(1, self.n+1, 2):
            with self._condition:
                self._condition.wait_for(lambda: self.order == 1)
                printNumber(i)
                self.order = 0
                self._condition.notify_all()

In [126]:
zeo = ZeroEvenOdd(10)
import sys
printNumber = lambda x: print(x)
with ThreadPoolExecutor(max_workers=3) as ex:
    ex.submit(zeo.zero, printNumber)
    ex.submit(zeo.even, printNumber)
    ex.submit(zeo.odd, printNumber)

0
1
0
2
0
3
0
4
0
5
0
6
0
7
0
8
0
9
0
10


In [124]:
sys.stdout.write('ww')
sys.stdout.write('wwdw')

wwwwdw