1. Bank Account Simulation Task: Simulate a bank account with deposit and withdrawal functions accessed by multiple threads. Objective: Use Lock to ensure the balance does not go negative or get corrupted

In [1]:
import threading

balance = 100
lock = threading.Lock()

def deposit(amount):
    global balance
    with lock:
        balance += amount
        print("Deposited:", amount, "Balance:", balance)

def withdraw(amount):
    global balance
    with lock:
        if balance >= amount:
            balance -= amount
            print("Withdrawn:", amount, "Balance:", balance)
        else:
            print("Insufficient balance")

t1 = threading.Thread(target=deposit, args=(50,))
t2 = threading.Thread(target=withdraw, args=(120,))

t1.start()
t2.start()


Deposited: 50 Balance: 150
Withdrawn: 120 Balance: 30


2. Producer-Consumer with Lock and Condition Task: Implement a producer-consumer scenario using a shared list. Producers add items; consumers remove items. Objective: Use threading.Lock or threading.Condition to synchronize threads and avoid race conditions

In [2]:
import threading
import time

items = []
cond = threading.Condition()

def producer():
    for i in range(5):
        with cond:
            items.append(i)
            print("Produced:", i)
            cond.notify()
        time.sleep(1)

def consumer():
    for i in range(5):
        with cond:
            cond.wait()
            item = items.pop(0)
            print("Consumed:", item)

threading.Thread(target=producer).start()
threading.Thread(target=consumer).start()


Produced: 0


3. Deadlock Detection Task: Create 3 threads each acquiring 2 locks in a potentially conflicting order. Implement a simple detection mechanism to print a warning if deadlock occurs. Objective: Understand complex deadlocks and how to identify them

In [3]:
import threading
import time

lock1 = threading.Lock()
lock2 = threading.Lock()

def task1():
    lock1.acquire()
    time.sleep(1)
    if not lock2.acquire(timeout=2):
        print("Deadlock detected in Task 1")
    else:
        lock2.release()
    lock1.release()

def task2():
    lock2.acquire()
    time.sleep(1)
    if not lock1.acquire(timeout=2):
        print("Deadlock detected in Task 2")
    else:
        lock1.release()
    lock2.release()

t1 = threading.Thread(target=task1)
t2 = threading.Thread(target=task2)

t1.start()
t2.start()
