In [None]:
import threading
import time

# Declare three semaphore variables: mutex, full, and empty.
# mutex is used to ensure mutual exclusion when accessing shared variables.
# full represents the number of full slots in the buffer.
# empty represents the number of empty slots in the buffer.
mutex = threading.Semaphore(1)
full = threading.Semaphore(0)
empty = threading.Semaphore(3)

# Variable to keep track of the item count.
x = 0

# The producer function, responsible for producing items.
def producer():
    global mutex, full, empty, x

    # Acquire an empty slot in the buffer.
    empty.acquire()

    # Acquire the mutex to ensure mutual exclusion when accessing shared variables.
    mutex.acquire()

    # Increment the item count.
    x += 1

    # Print a message indicating the item is produced.
    print(f"Producer produces item {x}\n")

    # Release the mutex to allow other threads to access shared variables.
    mutex.release()

    # Release a full slot in the buffer.
    full.release()

# The consumer function, responsible for consuming items.
def consumer():
    global mutex, full, empty, x

    # Acquire a full slot in the buffer.
    full.acquire()

    # Acquire the mutex to ensure mutual exclusion when accessing shared variables.
    mutex.acquire()

    # Print a message indicating the item is consumed.
    print(f"Consumer consumes item {x}\n")

    # Decrement the item count.
    x -= 1

    # Release the mutex to allow other threads to access shared variables.
    mutex.release()

    # Release an empty slot in the buffer.
    empty.release()

# The main function to start the producer-consumer simulation.
def main():
    while True:
        # Display the menu for user selection.
        print("1. PRODUCER\n2. CONSUMER\n3. EXIT")
        n = int(input("ENTER YOUR CHOICE: \n"))

        if n == 1:
            # If the buffer is not full, start a new producer thread.
            if empty._value != 0:
                producer_thread = threading.Thread(target=producer)
                producer_thread.start()
            else:
                print("BUFFER IS FULL")

        elif n == 2:
            # If the buffer is not empty, start a new consumer thread.
            if full._value != 0:
                consumer_thread = threading.Thread(target=consumer)
                consumer_thread.start()
            else:
                print("BUFFER IS EMPTY")

        elif n == 3:
            # Exit the program.
            break

        else:
            print("Invalid choice. Please try again.")

if __name__ == "__main__":
    main()


#### Altenative

In [None]:
import threading
import time
import random

# Buffer size
BUFFER_SIZE = 5

# Semaphore to control access to the buffer
mutex = threading.Semaphore(1)

# Semaphore to count the empty slots in the buffer
empty = threading.Semaphore(BUFFER_SIZE)

# Semaphore to count the number of items in the buffer
full = threading.Semaphore(0)

# Buffer to store items
buffer = []

# The producer function, responsible for producing items
def producer():
    for _ in range(10):
        # Generate a random item to be produced
        item = random.randint(1, 100)

        # Acquire an empty slot in the buffer
        empty.acquire()

        # Acquire the mutex to ensure mutual exclusion when accessing the buffer
        mutex.acquire()

        # Add the item to the buffer
        buffer.append(item)

        # Print a message indicating the item is produced and the current buffer contents
        print(f"Producer: Produced item {item}. Buffer: {buffer}")

        # Release the mutex to allow other threads to access the buffer
        mutex.release()

        # Release a full slot in the buffer to signal that an item is available
        full.release()

        # Introduce a random delay to simulate variable production time
        time.sleep(random.uniform(0.1, 0.5))

# The consumer function, responsible for consuming items
def consumer():
    for _ in range(10):
        # Acquire a full slot in the buffer to check if an item is available for consumption
        full.acquire()

        # Acquire the mutex to ensure mutual exclusion when accessing the buffer
        mutex.acquire()

        # Remove the first item from the buffer
        item = buffer.pop(0)

        # Print a message indicating the item is consumed and the current buffer contents
        print(f"Consumer: Consumed item {item}. Buffer: {buffer}")

        # Release the mutex to allow other threads to access the buffer
        mutex.release()

        # Release an empty slot in the buffer to signal that a slot is available for production
        empty.release()

        # Introduce a random delay to simulate variable consumption time
        time.sleep(random.uniform(0.1, 0.5))

if __name__ == "__main__":
    # Create producer and consumer threads
    producer_thread = threading.Thread(target=producer)
    consumer_thread = threading.Thread(target=consumer)

    # Start the threads
    producer_thread.start()
    consumer_thread.start()

    # Wait for the threads to complete
    producer_thread.join()
    consumer_thread.join()

    # Print a message indicating that the simulation is completed
    print("Producer-Consumer simulation completed.")
