<a href="https://colab.research.google.com/github/254francis/Operating_Systems_LabWork/blob/main/Operating_Systems_Assignment2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
import threading
import time
import random
from threading import Semaphore, Lock
from collections import deque

class ProducerConsumer:
    def __init__(self, buffer_size=5):
        # Shared buffer (circular buffer)
        self.buffer = deque(maxlen=buffer_size)
        self.buffer_size = buffer_size

        # Semaphores for synchronization
        self.empty = Semaphore(buffer_size)  # Number of empty slots
        self.full = Semaphore(0)             # Number of full slots
        self.mutex = Lock()                  # Mutual exclusion for buffer access

        # Control variables
        self.running = True
        self.item_counter = 1
        self.produced_count = 0
        self.consumed_count = 0
        self.max_items = 20

        print(f"=== Producer-Consumer Problem Simulation ===")
        print(f"Buffer Size: {buffer_size}")
        print(f"Maximum items to produce: {self.max_items}")
        print("=" * 50)

    def producer(self, producer_id):
        """Producer process that produces items"""
        while self.running and self.produced_count < self.max_items:
            # Produce an item
            item = f"Item-{self.item_counter}"

            # Wait for empty slot
            self.empty.acquire()

            # Critical section - add item to buffer
            self.mutex.acquire()
            try:
                if self.running and self.produced_count < self.max_items:
                    self.buffer.append(item)
                    self.item_counter += 1
                    self.produced_count += 1

                    buffer_state = list(self.buffer)
                    print(f"Producer-{producer_id}: Produced {item}")
                    print(f"  Buffer: {buffer_state} (Size: {len(self.buffer)}/{self.buffer_size})")
                    print(f"  Total Produced: {self.produced_count}")
                    print()
                else:
                    # If we shouldn't produce, release the empty semaphore
                    self.empty.release()
                    break
            finally:
                self.mutex.release()

            # Signal that buffer has a full slot
            self.full.release()

            # Simulate production time
            time.sleep(random.uniform(0.5, 2.0))

    def consumer(self, consumer_id):
        """Consumer process that consumes items"""
        while self.running:
            # Wait for full slot
            if not self.full.acquire(timeout=3):
                if self.produced_count >= self.max_items and len(self.buffer) == 0:
                    break
                continue

            # Critical section - remove item from buffer
            self.mutex.acquire()
            try:
                if self.buffer:
                    item = self.buffer.popleft()
                    self.consumed_count += 1

                    buffer_state = list(self.buffer)
                    print(f"Consumer-{consumer_id}: Consumed {item}")
                    print(f"  Buffer: {buffer_state} (Size: {len(self.buffer)}/{self.buffer_size})")
                    print(f"  Total Consumed: {self.consumed_count}")
                    print()
                else:
                    # If buffer is empty, release full semaphore
                    self.full.release()
                    continue
            finally:
                self.mutex.release()

            # Signal that buffer has an empty slot
            self.empty.release()

            # Simulate consumption time
            time.sleep(random.uniform(1.0, 2.5))

            # Check if all items are consumed
            if self.consumed_count >= self.max_items:
                break

    def run_simulation(self, num_producers=2, num_consumers=2):
        """Run the producer-consumer simulation"""
        print(f"Starting simulation with {num_producers} producers and {num_consumers} consumers\n")

        # Create and start producer threads
        producer_threads = []
        for i in range(num_producers):
            thread = threading.Thread(target=self.producer, args=(i+1,))
            thread.start()
            producer_threads.append(thread)

        # Create and start consumer threads
        consumer_threads = []
        for i in range(num_consumers):
            thread = threading.Thread(target=self.consumer, args=(i+1,))
            thread.start()
            consumer_threads.append(thread)

        # Wait for all producer threads to complete
        for thread in producer_threads:
            thread.join()

        print("All producers finished. Waiting for consumers to finish remaining items...\n")

        # Wait for all consumer threads to complete
        for thread in consumer_threads:
            thread.join()

        self.running = False
        self.print_summary()

    def print_summary(self):
        """Print simulation summary"""
        print("=" * 50)
        print("           SIMULATION SUMMARY")
        print("=" * 50)
        print(f"Total items produced: {self.produced_count}")
        print(f"Total items consumed: {self.consumed_count}")
        print(f"Items remaining in buffer: {len(self.buffer)}")
        print(f"Buffer contents: {list(self.buffer)}")

        if self.produced_count == self.consumed_count:
            print("SUCCESS: All produced items were consumed!")
        else:
            print("NOTICE: Some items remain unconsumed")

        print("=" * 50)

def demonstrate_race_condition():
    """Demonstrate what happens without proper synchronization"""
    print("\n=== DEMONSTRATION: Race Condition Without Semaphores ===")
    print("This shows why synchronization is necessary:\n")

    # Simple example without proper synchronization
    unsafe_buffer = []
    buffer_size = 3

    def unsafe_producer():
        for i in range(5):
            if len(unsafe_buffer) < buffer_size:
                item = f"UnsafeItem-{i+1}"
                unsafe_buffer.append(item)
                print(f"Unsafe Producer: Added {item} | Buffer: {unsafe_buffer}")
            else:
                print(f"Unsafe Producer: Buffer full, cannot add item {i+1}")
            time.sleep(0.1)

    def unsafe_consumer():
        for i in range(5):
            if unsafe_buffer:
                item = unsafe_buffer.pop(0)
                print(f"Unsafe Consumer: Consumed {item} | Buffer: {unsafe_buffer}")
            else:
                print("Unsafe Consumer: Buffer empty, nothing to consume")
            time.sleep(0.15)

    # Run unsafe version (may have race conditions)
    unsafe_prod = threading.Thread(target=unsafe_producer)
    unsafe_cons = threading.Thread(target=unsafe_consumer)

    unsafe_prod.start()
    unsafe_cons.start()

    unsafe_prod.join()
    unsafe_cons.join()

    print("Notice: Without semaphores, timing issues can cause problems!")
    print("The semaphore version above ensures proper synchronization.\n")

def interactive_menu():
    """Interactive menu for different simulation options"""
    while True:
        print("\n" + "="*50)
        print("    PRODUCER-CONSUMER SIMULATION MENU")
        print("="*50)
        print("1. Run Default Simulation (2 producers, 2 consumers, buffer size 5)")
        print("2. Run Custom Simulation")
        print("3. Demonstrate Race Condition")
        print("4. Show Algorithm Explanation")
        print("5. Exit")

        choice = input("\nEnter your choice (1-5): ").strip()

        if choice == '1':
            pc = ProducerConsumer(buffer_size=5)
            pc.run_simulation(num_producers=2, num_consumers=2)

        elif choice == '2':
            try:
                buffer_size = int(input("Enter buffer size (1-10): "))
                num_producers = int(input("Enter number of producers (1-5): "))
                num_consumers = int(input("Enter number of consumers (1-5): "))
                max_items = int(input("Enter maximum items to produce (5-50): "))

                if not (1 <= buffer_size <= 10 and 1 <= num_producers <= 5 and
                       1 <= num_consumers <= 5 and 5 <= max_items <= 50):
                    print("Invalid input! Please enter values within the specified ranges.")
                    continue

                pc = ProducerConsumer(buffer_size=buffer_size)
                pc.max_items = max_items
                pc.run_simulation(num_producers=num_producers, num_consumers=num_consumers)

            except ValueError:
                print("Error: Please enter valid numbers!")

        elif choice == '3':
            demonstrate_race_condition()

        elif choice == '4':
            show_algorithm_explanation()

        elif choice == '5':
            print("Exiting simulation. Goodbye!")
            break

        else:
            print("Invalid choice! Please enter a number between 1-5.")

def show_algorithm_explanation():
    """Show detailed algorithm explanation"""
    print("\n" + "="*60)
    print("          PRODUCER-CONSUMER ALGORITHM EXPLANATION")
    print("="*60)
    print("""
PROBLEM:
- Multiple producers generate items and put them in a shared buffer
- Multiple consumers take items from the shared buffer and process them
- Buffer has limited size
- Need to prevent race conditions and ensure synchronization

SEMAPHORES USED:
1. empty: Counts empty slots in buffer (initially = buffer_size)
2. full:  Counts full slots in buffer (initially = 0)
3. mutex: Binary semaphore for mutual exclusion (lock)

PRODUCER ALGORITHM:
1. Produce an item
2. Wait for empty slot (acquire empty semaphore)
3. Lock buffer (acquire mutex)
4. Add item to buffer
5. Unlock buffer (release mutex)
6. Signal full slot available (release full semaphore)

CONSUMER ALGORITHM:
1. Wait for full slot (acquire full semaphore)
2. Lock buffer (acquire mutex)
3. Remove item from buffer
4. Unlock buffer (release mutex)
5. Signal empty slot available (release empty semaphore)
6. Consume the item

KEY POINTS:
- Semaphores prevent buffer overflow and underflow
- Mutex ensures only one thread accesses buffer at a time
- Proper ordering prevents deadlock
- Synchronization ensures consumer doesn't consume non-existent items
""")
    print("="*60)

def main():
    """Main function to run the program"""
    print("Welcome to the Producer-Consumer Problem Simulation!")
    print("This program demonstrates synchronization using semaphores.")

    try:
        interactive_menu()
    except KeyboardInterrupt:
        print("\n\nProgram interrupted by user. Goodbye!")
    except Exception as e:
        print(f"\nAn error occurred: {e}")

if __name__ == "__main__":
    main()

Welcome to the Producer-Consumer Problem Simulation!
This program demonstrates synchronization using semaphores.

    PRODUCER-CONSUMER SIMULATION MENU
1. Run Default Simulation (2 producers, 2 consumers, buffer size 5)
2. Run Custom Simulation
3. Demonstrate Race Condition
4. Show Algorithm Explanation
5. Exit


Program interrupted by user. Goodbye!
