**CPU** => hardware that executes code   
**OS**  => handles the scheduling of when the CPU can be used  
**Process**  => program as its being executed  
**Thread**   => part of a process  
  
Simple programs only need one thread  
More complicated ones may require many threads (Multi-thread)

## Single Thread

In [3]:
import time

def myfunc():
    print("hello")
    time.sleep(10)
    return True

In [4]:
myfunc()

hello


True

## Two Threads

In [5]:
import time

def myfunc(name):
    print(f"my func started with {name}")
    time.sleep(10)
    print("myfunc ended")

In [6]:
print('main started')
myfunc('brian')
print('main ended')

main started
my func started with brian
myfunc ended
main ended


In [9]:
import threading

print('main started')
t = threading.Thread(target=myfunc, args=['brian'])
t.start()
print('main ended')

main started
my func started with brian
main ended
myfunc ended


## Daemon Thread
- process thats running, but in the background
- regular thread vs. daemon thread:
    - **main thread** will **not wait** for **daemon threads to complete** before exiting

In [11]:
# this doesn't print properly, as the main thread is still running the notebook
print('main started')
t = threading.Thread(target=myfunc, args=['brian'], daemon=True)
t.start()
print('main ended')

main started
my func started with brian
main ended
myfunc ended


## Joining Threads
-  bring all your threads together before the main thread exits

In [12]:
print('main started')
t = threading.Thread(target=myfunc, args=['brian'])
t.start()
t.join()
print('main ended')

main started
my func started with brian
myfunc ended
main ended


## Multiple Threads

In [17]:
def myfunc1(name):
    print(f"my func1 started with {name}")
    time.sleep(10)
    print("myfunc1 ended")

def myfunc2(name):
    print(f"my func2 started with {name}")
    time.sleep(10)
    print("myfunc2 ended")

def myfunc3(name):
    print(f"my func3 started with {name}")
    time.sleep(10)
    print("myfunc3 ended")

print('main started')
t1 = threading.Thread(target=myfunc1, args=['brian'])
t1.start()
t2 = threading.Thread(target=myfunc2, args=['ciran'])
t2.start()
t3 = threading.Thread(target=myfunc3, args=['david'])
t3.start()
t1.join()
t2.join()
t3.join()
print('main ended')

main started
my func1 started with brian
my func2 started with ciran
my func3 started with david
myfunc3 endedmyfunc1 ended

myfunc2 ended
main ended


## Thread Pool

In [19]:
import time
import concurrent.futures

def myfunc(name):
    print(f"my func started with {name}")
    time.sleep(10)
    print(f"myfunc ended with {name}")

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as e:
    print('main started')
    e.map(myfunc, ['brian', 'ciran', 'david'])
    print('main ended')

# doesnt print out correctly as main doees not finish

main started
my func started with brian
my func started with ciran
my func started with david
main ended
myfunc ended with brian
myfunc ended with ciran
myfunc ended with david


## Race Conditions
- happens when more than one thread is trying to access a shared piece of data at the same time

### Bank Account Program

In [26]:
import concurrent.futures
import time

class Account:
    def __init__(self):
        self.balance = 100 # shared data
    def update(self, transaction, amount):
        print(f"{transaction} thread updating...")
        local_copy = self.balance
        local_copy += amount
        time.sleep(1)
        self.balance = local_copy 
        print(f"{transaction} thread finishing...")

In [40]:
account = Account()
print(f"starting with balance of {account.balance}")
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as ex:
    for transaction, amount in [('deposit', 150), ('withdraw', -150)]:
        ex.submit(account.update, transaction, amount)
print(f'ending balance with {account.balance}')

# should have finished with 100
# finishes with -50 as the balance value was access before it was updated

starting with balance of 100
deposit thread updating...
withdraw thread updating...
deposit thread finishing...
withdraw thread finishing...
ending balance with 100


## Lock Objects
- used to prevent more than one thread from accessing a part of your program at the same time.

In [31]:
import threading

# create lock
lock = threading.Lock()
print(lock)

# lock the lock
lock.acquire()
print(lock)

# open/release the lock
lock.release()
print(lock)

<unlocked _thread.lock object at 0x111042990>
<locked _thread.lock object at 0x111042990>
<unlocked _thread.lock object at 0x111042990>


In [38]:
# add lock to the update to prevent two threads from modifying the balance at the same time
import concurrent.futures
import time

class Account:
    def __init__(self):
        self.balance = 100 # shared data
        self.lock = threading.Lock()
    def update(self, transaction, amount):
        print(f"{transaction} thread updating...")
        with self.lock:
            local_copy = self.balance
            local_copy += amount
            time.sleep(1)
            self.balance = local_copy 
        print(f"{transaction} thread finishing...")

In [39]:
account = Account()
print(f"starting with balance of {account.balance}")
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as ex:
    for transaction, amount in [('deposit', 150), ('withdraw', -150)]:
        ex.submit(account.update, transaction, amount)
print(f'ending balance with {account.balance}')

starting with balance of 100
deposit thread updating...
withdraw thread updating...
deposit thread finishing...
withdraw thread finishing...
ending balance with 100


### Deadlocks
- when a thread trys to lock a lock which it has already locked

In [42]:
import threading

lock = threading.Lock()
lock.acquire()
# causes deadlock by calling acquire before calling release
lock.acquire()

KeyboardInterrupt: 

In [45]:
# can use RLock with prevents deadlock
lock = threading.RLock()
lock.acquire()
lock.acquire()
print(lock)
lock.release()
print(lock)
print(threading.current_thread())

<locked _thread.RLock object owner=4706420160 count=2 at 0x1113e3750>
<locked _thread.RLock object owner=4706420160 count=1 at 0x1113e3750>
<_MainThread(MainThread, started 4706420160)>


## Producer-Consumer Pipeline

In [20]:
import random
import threading

FINISH = 'THE END'
class Pipeline:
    def __init__(self, capacity):
        self.capacity = capacity
        self.message = None
        self.producer_lock = threading.Lock()
        self.consumer_lock = threading.Lock()
        print()
        self.consumer_lock.acquire()
    def set_message(self, message):
        print(f'producing message of {message}')
        producer_pipeline.append(message)
        self.producer_lock.acquire()
        self.message = message
        self.consumer_lock.release()
    def get_message(self):
        print(f'consuming message of {self.message}')
        self.consumer_lock.acquire()
        message = self.message
        self.producer_lock.release()
        consumer_pipeline.append(message)
        return message

def producer(pipeline):
    for _ in range(pipeline.capacity):
        message = random.randint(1, 100)
        pipeline.set_message(message)
    pipeline.set_message(FINISH)

def consumer(pipeline):
    message = None
    while message is not FINISH:
        message = pipeline.get_message()
        if message is not FINISH:
            time.sleep(random.random())

In [24]:
import concurrent.futures

producer_pipeline = []
consumer_pipeline = []

pipeline = Pipeline(10)
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as ex:
    ex.submit(producer, pipeline)
    ex.submit(consumer, pipeline)
print(f'producer: {producer_pipeline}')
print(f'consumer: {consumer_pipeline}')

producing message of 33
producing message of 15
consuming message of 33
producing message of 65


KeyboardInterrupt: 

## Queue Model
previous pipeline class was limited
- limited by capacity
- was one-to-one ratio

In [29]:
import random
import threading
import queue
import time

class Pipeline(queue.Queue): #pipeline inherits from Queue class
    def __init__(self):
        super().__init__(maxsize=10) # inherit the init method
    def set_message(self, message):
        print(f'producing message of {message}')
        producer_pipeline.append(message)
        self.put(message)
    def get_message(self):
        message = self.get()
        print(f'consuming message of {message}')
        consumer_pipeline.append(message)
        return message

def producer(pipeline, event):
    while not event.is_set(): 
        message = random.randint(1, 100)
        pipeline.set_message(message)

def consumer(pipeline, event):
    while not pipeline.empty or not event.is_set():
        print(f'queue size is {pipeline.qsize()}')
        message = pipeline.get_message()
        time.sleep(random.random())

In [30]:
import concurrent.futures

producer_pipeline = []
consumer_pipeline = []

pipeline = Pipeline()
event = threading.Event() # is either true or false. Initialised as False
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as ex:
    ex.submit(producer, pipeline, event)
    ex.submit(consumer, pipeline, event)
    time.sleep(0.5)
    event.set()
print(f'producer: {producer_pipeline}')
print(f'consumer: {consumer_pipeline}')

producing message of 58
producing message of 49
producing message of 82
queue size is 2producing message of 53
producing message of 88
producing message of 81
producing message of 20
producing message of 98
producing message of 33
producing message of 37
producing message of 9

consuming message of 58
producing message of 90
queue size is 10
consuming message of 49
producing message of 13


KeyboardInterrupt: 

## Semaphore

In [31]:
import threading

sem = threading.Semaphore()
print(dir(sem))

['__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__enter__', '__eq__', '__exit__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_cond', '_value', 'acquire', 'release']


In [32]:
sem = threading.Semaphore(value=50)
print(sem._value)

50


In [33]:
sem.acquire()
sem.acquire()
sem.acquire()
print(sem._value)

47


In [34]:
sem.release()
print(sem._value)

48
