In [2]:
import threading
import time

## Basic Locking Mechanism
The first way to write a concurrent program is to use a lock which spins on the mutex's acquire and release. 

In [3]:
def test_lock(): 
    lock = threading.Lock()
    name = 10
    def worker(): 
        lock.acquire()
        nonlocal name
        name += 1
        print(name)
        lock.release()

        with lock: 
            print (name)

    for j in range(2): 
        t = threading.Thread(target = worker)
        t.start()

    main_thread = threading.currentThread()
    for t in threading.enumerate(): 
        if t is not main_thread: 
            t.join()

## Inter-Thread Signaling 
An alternative to using a mutex is to wait for an event to happen. We have: 
- Event: great for getting an one-time event
- Conditional Variable: To have reusable events, use conditional_variable.
- Semaphore: An alternative to conditional_variable, allowing multiple threads to access a critical section


In [18]:
def test_event(): 
    '''
    - it has "clear", but that's still not atomic. 
    '''
    def foo(ev):
        for _ in range(5):
            print(f"Before flag: {ev.isSet()}")
            ev.wait(2) #timeout
            print(f"After flag: {ev.isSet()}")

    ev = threading.Event()
    th1 = threading.Thread(name="Th1", target=foo, args=(ev,))
    th1.start()
    time.sleep(1)
    ev.set()
    #before this is called, ev.wait() will not be called
    ev.clear()

Before flag: False
After flag: True
Before flag: True
After flag: True
Before flag: True
After flag: True
Before flag: True
After flag: True
Before flag: True
After flag: True


flag: False
flag: False
After flag: False
Before flag: False


In [1]:
from threading import Condition, Thread, get_ident
def test_condition_variable_vanilla():
    '''
    - You can use context manager, or acquire, release.
    - Conditional Variable has a lock in it. 
        - Waiter: first acquire the lock, then wait() will release the lock and block the current thread. Once awaken, the lock will be locked again, and will be released
        - Caller: acquire the lock, call `notify_all` or `notify(n)`, then release
    '''
    # Vanilla way to create condition variable
    cv1 = Condition()
    def func1():
        cv1.acquire()
        cv1.wait()
        print("cv1 wait ended, thread id: ", get_ident())
        cv1.release()
    def func2():
        # can use context manager as well
        with cv1:
            cv1.wait()
            print("cv1 wait ended, thread id: ", get_ident())

    t = Thread(target=func1)
    t2 = Thread(target=func2)
    t.start()
    t2.start()
    cv1.acquire()
    cv1.notify(n=1)
    print("notified one thread")
    cv1.notify_all()
    print("notified all threads")
    cv1.release()
    t.join()
    t2.join()

test_condition_variable_vanilla()

notified one thread
notified all threads
cv1 wait ended, thread id:  140297505855232
cv1 wait ended, thread id:  140297497462528


In [7]:
def test_semaphore():
    """
    - Semaphore: integer shared by two processes
        - Just like a parking lot indicator with 3 available slots. The semaphore will start at 3. When a car gets in, calls acquire(), wait() sets semaphore--. When semaphore == 0, nobody can get it. 
        - Mutex is semaphore = 1 (binary semaphore)
    """
    import threading
    from threading import Semaphore
    # when Semaphore is 1, it's binary
    s = Semaphore(7)
    w = 0
    def wurk():
        nonlocal s, w
        s.acquire()
        print("before incrementing: ", w)
        w += 1
        print("after incrementing: ", w)
        s.release()

    for i in range(10):
        t = threading.Thread(target=wurk, args=(), daemon=True)
        t.start()

    # semaphore will keep increasing with the release calls!
    print(s._value)
    s.release()
    s.release()
    s.release()
    s.release()
    print(s._value)
test_semaphore()


before incrementing:  0
after incrementing:  1
before incrementing:  1
after incrementing:  2
before incrementing:  2
after incrementing:  3
before incrementing:  3
after incrementing:  4
before incrementing:  4
after incrementing:  5
before incrementing:  5
after incrementing:  6
before incrementing:  6
after incrementing:  7
before incrementing:  7
after incrementing:  8
before incrementing:  8
after incrementing:  9
before incrementing:  9
after incrementing:  10
7
11


## Thread Pool

In [None]:
def test_threadpool(): 
    def foo(i): 
        print(i)

    with concurrent.futures.ThreadPoolExecutor() as executor: 
        futures = []
        for i in range(5): 
            futures.append(executor.submit(foo, i=i))
        for future in futures: 
            future.result()

## Communicating Between Threads
1. Use ``Queue.queue 
    - Should use a sentinel value to signal termination
    - Can put events on the queue to signal what objects are being processed
    - **An object put on the queue is just a reference, not the object itself!**

In [None]:
def test_multiple_threads_queue():
    """
    1. thread.setDaemon(True) will make a daemon thread,which automatically & immediately joins when the main thread is joined.
        - a regular non-daemon thread will have to wait until it finishes
    2. task_done() signifying one item has been processed to the queue
    3. join() waits for a thread to finish
    4. About shutdown:
        1. Common Practices
            - send a sentinel value with the message is common practice
            - Or have a special function that sets a flag to 0. 
        2. Pain: if you don't signal the thread, the thread will never know when to finish. Also, __del__ is not the way to go
            - garbage collection happens when reference count = 1. But 
                - when a daemon thread is still running, the enclosing object skips destruction, and it's garbage collected when exiting the program
                - when a non-daemon thread is still running, the main thread will hang because it will wait for the thread to finish
            - notes: 
                - x.__del__() may not be called during program exit.
                - del x doesn’t directly call x.__del__() — the former decrements the reference count for x by one, and the latter is only called when x’s reference count reaches zero.
        3. you can't forcibly kill a thread like killing a process (implemented on SIGTERM)
    """
    from queue import Queue, Empty
    from threading import Thread
    class Example:
        def __init__(self):
            self.should_run = True
            self.q = Queue()
            self.th = Thread(target = self.__work)
            self.th.start()
        def __work(self):
            while self.should_run:
                try: 
                    self.q.get(timeout=1)
                    self.q.task_done()
                except Empty:
                    print("work should_run: ", self.should_run)
                    pass
        def put(self, value):
            self.q.put(value)
        def shutdown(self):
            self.should_run = False
        def __del__(self):
            self.should_run = False
            print("del should_run: ", self.should_run)
            self.q.join()
    f = Example()
    f.put(1)
    # object() could be a sentinel value
    f.put(object())
    f.shutdown()

In [None]:
def test_daemon_thread():
    """
    1. Daemon Thread joins after the process is joined
        - shutdown: 
            1. If daemon thread has finished, join() succeeds and calls __del__ of daemon object
            2. if not, it will be garbage collected and its object will live until then, whose __del__ is not guaranteed to be called
    2. Problem: printing stuff in daemon thread is dangerous, may not be ablt to get lock for stdout at shutdown
    """
    from threading import Thread
    import queue
    class TestDaemon:
        def __init__(self):
            self.queue = queue.Queue()
            dth = Thread(target = self.daemon_func)
            dth.setDaemon(True)
            dth.start()
        def daemon_func(self):
            while True:
                time.sleep(0.01)
        def __del__(self):
            self.queue.join()

    t = TestDaemon()


In [None]:
def test_threading_timer():
    '''
    1. Threading timer: execute a function on a different thread, after a certain timeout
        - You can cancel that as well.
    '''
    import threading 
    import time
    def print_msg(message):
        print(message)
    timer = threading.Timer(3, print_msg, args=("Heellloo", ))
    # start counting 3s
    timer.start()
    time.sleep(1)
    timer.cancel()
    print("Canceled timer") 

    # thread can only be started once.
    timer = threading.Timer(3, print_msg, args=("Heellloo", ))
    timer.start()
    print("Waiting for timer to fire") 
    time.sleep(4)
    print("Main thread will do its thing as well")