# Class 9 (11.10.2021)

# OS concepts in Python
* Process is a program under execution.
* It consists of a Program Counter, Process Stack, Registers, Program Code, etc.
* A thread is a light weight process managed independently by the scheduler.
* Threads improve performance via parallelism and concurrency.
* Threads share data segment, code and files with other threads belonging to the same process.
* Threads have their own Registers, Stack and Program Counter.
* Thread is a subset of a process.

In [6]:
def cube(num):
    print(f"Cube of the given number if {num ** 3}")

def square(num):
    print(f"Square of the given number if {num ** 2}")

## Thread Class in Python
* ***Target***: Function to be executed by the thread
* ***Args***: Arguments to be passed to the target function

- Use start method from thread class to start a new thread.
- Use join method to join threads (wait for another thread to terminate)

In [16]:
# Run this multiple times to show how the threads compete for stdout
from threading import Thread

t1 = Thread(target = cube, args = (10,))  # Args has to be an iterable
t2 = Thread(target = square, args = (5,))

# Starting the thread's execution
t1.start()
t2.start()

print("3 Threads are currently in execution")

# Wait untill t1 and t2 complete execution
t1.join() # Main thread waits for this thread to finish execution (main thread is blocked)
t2.join() # Main thread is blocked untill t2 finishes

print("Both Completed")

Cube of the given number if 1000
Square of the given number if 253 Threads are currently in execution

Both Completed


In [29]:
def print_loop_even(num):
    for i in range(num):
        print(f"{2*i} ==")


def print_loop_odd(num):
    for i in range(num):
        print(f"{2*i + 1} --")

In [32]:
# Run this multiple times to see how the threads compete for stdout
from threading import Thread

t1 = Thread(target = print_loop_even, args = (10,))  # Args has to be an iterable
t2 = Thread(target = print_loop_odd, args = (10,))

# Starting the thread's execution
t1.start()
t2.start()

print("3 Threads are currently in execution")

# Wait untill t1 completes execution
t1.join() # Main thread waits for this thread to finish execution (main thread is blocked)
t2.join() # Main thread is blocked untill t2 finishes

print("Both Completed")

0 ==
2 ==
4 ==
6 ==
8 ==
10 ==
12 ==
14 ==
16 ==
18 ==
1 --3 Threads are currently in execution

3 --
5 --
7 --
9 --
11 --
13 --
15 --
17 --
19 --
Both Completed


## Concept of global (scope of a variable)
Inside a function, the local variables are given higher priority than a global variable

In [34]:
x = 45
def example():
    x = 1
    print(f"Inside function {x}")
    x = 2

example()
print(f"Outside function {x}")

Inside function 1
Outside function 45


In [35]:
x = 45
def example():
    x += 1
    print(f"Inside function {x}")
    x = 2

example()
print(f"Outside function {x}")

UnboundLocalError: local variable 'x' referenced before assignment

In [37]:
x = 45
def example():
    global x
    x += 1
    print(f"Inside function {x}")
    
example()
print(f"Outside function {x}")

Inside function 46
Outside function 46


In [40]:
# Showing thread collisions

globVar = 0

def incGlobal():
    global globVar
    globVar += 1

def threadTask():
    for _ in range(1000000):
        incGlobal()

def main():
    global globVar
    globVar = 0
    
    t1 = Thread(target = threadTask)
    t2 = Thread(target = threadTask)
    
    t1.start()
    t2.start()
    
    t1.join()
    t2.join()

for i in range(10):
    main()
    print(f"{i} -> {globVar}")

0 -> 1632227
1 -> 1540315
2 -> 1587102
3 -> 1620880
4 -> 1604563
5 -> 1729259
6 -> 1497725
7 -> 1617416
8 -> 1701018
9 -> 1482813


In [48]:
from threading import Thread, active_count, main_thread
from time import sleep

def threadName():
    sleep(2)
    print("This is thread")

th = Thread(target = threadName, name = "th")
th.start()

print("Current thread count = ", active_count())
print(f"Thread {th.getName()} is {th.is_alive()}")
print()

th.join()

print()
print(f"Thread {th.getName()} is {th.is_alive()}")
print("Current thread count = ", active_count())
print()

print(f"Main thread's name = {main_thread().name}")

Current thread count =  9
Thread th is True

This is thread

Thread th is False
Current thread count =  8

Main thread's name = MainThread


### Number of threads on interactive kernels like jupyter would likely be higher
This is what the output to the above program would look like when run as a normal python file:

```
Current thread count =  2
Thread th is True

This is thread

Thread th is False
Current thread count =  1

Main thread's name = MainThread
```

In [53]:
t = Thread(target = threadName)
print(t.getName()) # Returns the default name assigned to the thread during its creation

t.start()
t.join()

Thread-107
This is thread


# Multiprocessing
* Ability of a system to utilise more than one processor to execute instructions at the same time.
* It is a mode of operation in which two or more processors in a computer simultaneously process two or more different portions of the same program (set of instructions). 
* For example, different processors may be used to manage memory storage, data communications, or arithmetic functions.
* In Python, this is used to launch multiple processes which the OS can then schedule to different logical processors (threads).
* Multiprocessing: More than 1 process is running on a single processor.
* Parallel Processing: 1 Process running on multiple processors.

### StackOverFlow
#### Multi Processing
Multiprocessing is the use of two or more central processing units (CPUs) within a single computer system. 
The term also refers to the ability of a system to support more than one processor and/or the ability to allocate tasks between them.

#### Parallel Processing
In computers, parallel processing is the processing of program instructions by dividing them among multiple processors with the objective of running a program in less time. 
In the earliest computers, only one program ran at a time.


In [55]:
from multiprocessing import Process

def cube(num):
    print(f"Cube of the given number if {num ** 3}")

def square(num):
    print(f"Square of the given number if {num ** 2}")

p1 = Process(target = cube, args = (10,))
p2 = Process(target = square, args = (2,))

p1.start()
p2.start()

p1.join()
p2.join()

print("Both processes finished")

Cube of the given number if 1000Square of the given number if 4

Both processes finished


Output obtained when run as a normal Python file:

```
Cube of the given number if 1000
Square of the given number if 4
Both processes finished
```

# Class 10 (13.10.2021)

## Lock Mechanisms
Lock or a mutex is a synchronization mechanism for enforcing limits on access to a resource in an environment with multiple threads in execution.

In [1]:
from multiprocessing import Process, Lock, Value
from time import sleep

def deposit(total):
    for i in range(100):
        sleep(0.01)
        total += 5
    return total

def withdraw(total):
    for i in range(100):
        sleep(0.01)
        total -= 5
    return total

total = 500
print(f"Initial value = {total}")
total = deposit(total)
print(f"Value after deposit = {total}")
total = withdraw(total)
print(f"Value after withdra = {total}")

Initial value = 500
Value after deposit = 1000
Value after withdra = 500


In [7]:
# Multiprocessing with no lock mechanisms (No Atomicity)

def deposit(total):
    for i in range(100):
        sleep(0.01)
        total.value += 5
    return total

def withdraw(total):
    for i in range(100):
        sleep(0.01)
        total.value -= 5
    return total

# Lookup Value class
# Value class returns a C type object which is present in the Shared Memory by default.
# It is synchronized using R Lock Mechanism (Doubtful)
# It is used to share information between processes (Inter Process Communication, IPC).
# Synchronized manager server ?? (Look up)
total = Value('i', 500) # C Type Int Object (i stands for int)
print(f"Total Object = {total}")
print(f"Initial Value = {total.value}")

add = Process(target = deposit, args = (total,))
sub = Process(target = withdraw, args = (total,))

add.start()
sub.start()

add.join()
sub.join()

print(f"Total = {total.value}")

Total Object = <Synchronized wrapper for c_int(500)>
Initial Value = 500
Total = 520


In [17]:
# With lock
lock = Lock() # Lock Object
print(f"Lock Object = {lock}")

def deposit(total, lock):
    for i in range(100):
        sleep(0.01)
        lock.acquire()
        total.value += 5
        lock.release()
    return total

def withdraw(total, lock):
    for i in range(100):
        sleep(0.01)
        lock.acquire()
        total.value -= 5
        lock.release()
    return total

total = Value('i', 500) # C Type Int Object (i stands for int)
print(f"Total Object = {total}")
print(f"Initial Value = {total.value}")

add = Process(target = deposit, args = (total, lock))
sub = Process(target = withdraw, args = (total, lock))

add.start()
sub.start()

add.join()
sub.join()

print(f"Total = {total.value}")

Lock Object = <Lock(owner=None)>
Total Object = <Synchronized wrapper for c_int(500)>
Initial Value = 500
Total = 500


# Race Conditions
Look up technical definition...

In [20]:
from threading import *
import random
import sys
from time import sleep

def f1(s):
    for i in range(0, len(s)):
        print(s[i], end = '')
        sys.stdout.flush() # Flush the data buffer
        sleep(random.random()*3)
        print(s[i], end = '')
        sys.stdout.flush()
        sleep(random.random()*3)

t1 = Thread(target = f1, args = ('ABCDEFGH',))
t2 = Thread(target = f1, args = ('abcdefgh',))

t1.start()
t2.start()

t1.join()
t2.join()

AaaAbbBccdBdeCeCDffDgEEghhFFGGHH

In [19]:
from threading import *
import random
import sys
from time import sleep

lock = Lock()

def f1(s):
    for i in range(0, len(s)):
        lock.acquire()
        print(s[i], end = '')
        sys.stdout.flush() # Flush the data buffer
        sleep(random.random()*3)
        print(s[i], end = '')
        sys.stdout.flush()
        lock.release()
        sleep(random.random()*3)

t1 = Thread(target = f1, args = ('ABCDEFGH',))
t2 = Thread(target = f1, args = ('abcdefgh',))

t1.start()
t2.start()

t1.join()
t2.join()

AAaabbBBccCCddeeDDffEEggFFhhGGHH

# Class 11 (20.10.2021)

# Semaphores
* One of the oldest synchronization primitives in the history of Computers Science.
* Invented by Dutch scientist Dijkstra.
* p() and v().

In [2]:
from threading import Semaphore

# This is not a bounded Semaphore.
sem = Semaphore(5) # Allows 5 threads to acquire it, i.e 5 threads can be in the critical section at a time

print(sem._value)

print(sem.acquire())

print(sem._value)

print(sem.release())
print(sem.release()) # Can release without acquiring as well (not ideal in some cases, hence use BoundedSemaphore)

print(sem._value)

5
True
4
None
None
6


In [3]:
from threading import BoundedSemaphore

bsem = BoundedSemaphore(10)

print(bsem.acquire())
print(bsem._value)

print(bsem.release())
print(bsem.release()) # Cannot release without acquiring

print(bsem._value)


True
9
None


ValueError: Semaphore released too many times

In [5]:
sem = Semaphore(1)
print(sem.acquire())
# By default, the acquire method for Semaphores is blocking. Hence, it blocks the thread trying to acquire it when its value is 0
print(sem.acquire()) # Check if this is busy wait
print("Acquired!")

True


KeyboardInterrupt: 

In [9]:
sem = Semaphore(1)
print(sem.acquire(blocking = False))
print(sem.acquire(blocking = False)) # When Blocking is set to False, It returns False immediately if the Semaphore cannot be acquired
print("Acquired!")

True
False
Acquired!


In [17]:
from threading import Semaphore, Thread
from time import  sleep

sem = Semaphore() # Default value of Semaphore is 1
print(sem, sem._value)

def f1():
    print("Starting Function f1")
    sem.acquire()
    for loop in range(1, 5):
        print(f"{loop}) Function f1 in loop")
        sleep(0.02)
    sem.release()
    print("Function f1 done")

def f2():
    print("Starting Function f2")
    while not sem.acquire(blocking = False):
        print("\n\033[91mFunction f2: Semaphore unavailable\033[0m\n")
        sleep(0.05)
    else:
        print("\n\033[92mFunction f2: Semaphore acquired\033[0m\n")
        for loop in range(1, 5):
            print(f"{loop}) Function f2 in loop")
            sleep(0.02)
    sem.release()

t1 = Thread(target = f1)
t2 = Thread(target = f2)

t1.start()
t2.start()

t1.join()
t2.join()

<threading.Semaphore object at 0x7f3c74753490> 1
Starting Function f1
1) Function f1 in loop
Starting Function f2

[91mFunction f2: Semaphore unavailable[0m

2) Function f1 in loop
3) Function f1 in loop
4) Function f1 in loop

[91mFunction f2: Semaphore unavailable[0m

Function f1 done

[92mFunction f2: Semaphore acquired[0m

1) Function f2 in loop
2) Function f2 in loop
3) Function f2 in loop
4) Function f2 in loop


# Producer Consumer Problem (PCP)
Helps us understand concurrency.
* Producer produces stuff and pushes it to a buffer.
* Consumer will consume the stuff from the buffer.

In [7]:
from threading import Thread, Lock
from time import  sleep
from random import choice, random

queue = []
lock = Lock()

RED = "\033[91m"
GREEN = "\033[92m"
RESET = "\033[0m"
C1 = "\033[93m"
C2 = "\033[94m"

class Producer(Thread):
    def run(self):
        nums = range(5)
        global queue
        for i in range(10):
            num = choice(nums)
            with lock:
                queue.append(num)
                print(f"{C1}Producer: {GREEN}Produced {num}{RESET}")
            sleep(random())

class Consumer(Thread):
    def run(self):
        global queue
        for i in range(10):
            with lock:
                if not queue: 
                    print(f"{C2}Consumer: {RED}Queue Empty{RESET}")
                else:
                    num = queue.pop(0)
                    print(f"{C2}Consumer: {GREEN}Consumed {num}{RESET}")
            sleep(random())

Producer().start()
Consumer().start()               

[93mProducer: [92mProduced 3[0m
[94mConsumer: [92mConsumed 3[0m
[94mConsumer: [91mQueue Empty[0m
[93mProducer: [92mProduced 1[0m
[93mProducer: [92mProduced 3[0m
[94mConsumer: [92mConsumed 1[0m
[94mConsumer: [92mConsumed 3[0m
[93mProducer: [92mProduced 2[0m
[94mConsumer: [92mConsumed 2[0m
[93mProducer: [92mProduced 4[0m
[94mConsumer: [92mConsumed 4[0m
[93mProducer: [92mProduced 4[0m
[94mConsumer: [92mConsumed 4[0m
[93mProducer: [92mProduced 3[0m
[94mConsumer: [92mConsumed 3[0m
[93mProducer: [92mProduced 1[0m
[94mConsumer: [92mConsumed 1[0m
[94mConsumer: [91mQueue Empty[0m
[93mProducer: [92mProduced 1[0m
[93mProducer: [92mProduced 0[0m


# Class 12 (25.10.2021)

# Condition Object
* Allows one or more threads to wait until notified by another thread.
* This can be used to prevent concurrency bugs.
* Conditions are associated with locks and have two methods --> acquire() and release()

In [1]:
from threading import Thread, Condition
from time import  sleep
from random import choice, random

simulationLength = range(20)
queue = []
maxNum = 5
condition = Condition()

RED = "\033[91m"
GREEN = "\033[92m"
RESET = "\033[0m"
C1 = "\033[93m"
C2 = "\033[94m"

class Producer(Thread):
    def run(self):
        nums = range(5)
        global queue
        for i in simulationLength:
            num = choice(nums)
            condition.acquire()
            if len(queue) == maxNum: 
                print(f"{C1}Producer: {RED}Queue Full{RESET}")
                condition.wait() # Wait for Consumer to notify after it has consumed something and made room for new products in the queue
            queue.append(num)
            print(f"{C1}Producer: {GREEN}Produced {num}{RESET}")
            condition.notify() # Notify the consumer that a new product has been produced
            condition.release()
            sleep(random())

class Consumer(Thread):
    def run(self):
        global queue
        for i in simulationLength:
            condition.acquire()
            if not queue: 
                print(f"{C2}Consumer: {RED}Queue Empty{RESET}")
                condition.wait() # Wait for Producer to add something the queue
            num = queue.pop(0)
            print(f"{C2}Consumer: {GREEN}Consumed {num}{RESET}")
            condition.notify() # Notify the producer to add something to the queue
            condition.release()
            sleep(random())

Producer().start()
Consumer().start()               

[93mProducer: [92mProduced 0[0m
[94mConsumer: [92mConsumed 0[0m
[93mProducer: [92mProduced 1[0m
[94mConsumer: [92mConsumed 1[0m
[93mProducer: [92mProduced 2[0m
[94mConsumer: [92mConsumed 2[0m
[94mConsumer: [91mQueue Empty[0m
[93mProducer: [92mProduced 4[0m
[94mConsumer: [92mConsumed 4[0m
[94mConsumer: [91mQueue Empty[0m
[93mProducer: [92mProduced 0[0m
[94mConsumer: [92mConsumed 0[0m
[94mConsumer: [91mQueue Empty[0m
[93mProducer: [92mProduced 1[0m
[94mConsumer: [92mConsumed 1[0m
[93mProducer: [92mProduced 4[0m
[94mConsumer: [92mConsumed 4[0m
[93mProducer: [92mProduced 1[0m
[94mConsumer: [92mConsumed 1[0m
[93mProducer: [92mProduced 4[0m
[93mProducer: [92mProduced 4[0m
[94mConsumer: [92mConsumed 4[0m
[94mConsumer: [92mConsumed 4[0m
[93mProducer: [92mProduced 1[0m
[94mConsumer: [92mConsumed 1[0m
[93mProducer: [92mProduced 3[0m
[94mConsumer: [92mConsumed 3[0m
[94mConsumer: [91mQueue Empty[0m
[93mProducer: 