In [1]:
import queue
import random
import time
import threading

**BacklogItem Class**: This class represents an individual backlog item, with an ID, a priority, and a "blocked" status.

In [2]:
# Define a Backlog Item class
class BacklogItem:
    def __init__(self, id, priority, is_blocked=False):
        self.id = id
        self.priority = priority
        self.is_blocked = is_blocked
    
    def __repr__(self):
        return f"ID: {self.id}, Priority: {self.priority}, Blocked: {self.is_blocked}"
    
    # Comparison method to support PriorityQueue sorting
    def __lt__(self, other):
        # Return True if this item should come before the other in the queue (i.e., higher priority)
        return self.priority > other.priority  # Higher priority comes first in a max-heap


**PriorityQueue**: A priority queue is used to hold the backlog items. We use the priority as a tuple with (priority, item), where the priority is the first element to ensure the queue processes items with the highest priority first.


In [3]:
# Create a priority queue for the backlog
backlog = queue.PriorityQueue()


In [4]:
# Add some sample backlog items with different priorities
def create_sample_backlog():
    for i in range(10):
        # 1 in 5 chance of being a blocked item
        is_blocked_item = random.choice([True, False, False, False, False])
        
        # generate item with random priority
        item = BacklogItem(id=i, priority=random.randint(1, 10), is_blocked=is_blocked_item)
        
        # put item into backlog
        backlog.put((item.priority, item))


**Worker Threads**: Multiple worker threads are created to simulate workers picking up tasks. Each worker attempts to pull the highest-priority item from the backlog. If an item is blocked, they skip it and recheck the backlog.

**Random Disappearance**: Workers have a 10% chance of "disappearing" (representing sick leave or vacation) at random intervals, which simulates real-world interruptions in workflow.

In [5]:
# Worker function
def worker(worker_name, project_end):
    while not project_end.is_set():
        # Simulate random worker disappearance (sick leave)
        if random.random() < (10/365):  # 10 sick days per year
            print(f"Worker {worker_name} is sick... ")
            time.sleep(random.randint(1, 5))  # Worker disappears for a random time
            continue

        # Simulate random worker disappearance (vacation)
        if random.random() < (25/365):  # 25 vacation days per year
            print(f"Worker {worker_name} is on vacation... ")
            time.sleep(5)  # Worker disappears for a working week
            continue
        
        # Try to pick a backlog item if available
        if not backlog.empty():
            priority, item = backlog.get()
            # Check if the item is blocked
            if item.is_blocked:
                print(f"Worker {worker_name} found item {item.id:2} is blocked, skipping it. ")
                # Reinsert the blocked item back into the queue
                backlog.put((priority, item))
                print(backlog.queue)
                time.sleep(1)  # Worker takes a break before trying again
            else:
                print(f"Worker {worker_name} is working on item {item.id:2} with priority {item.priority:2}. ")
                print(backlog.queue)
                time.sleep(random.randint(1, 3))  # Simulate time taken to work on the item
        else:
            print(f"Backlog empty, all tickets handled. Congratulations!!! 🎉")
            project_end.set()
            print(backlog.queue)
            time.sleep(2)


**Simulation Duration**: The simulation runs for 20 seconds, but you can adjust this value to run for longer if needed. During this time, workers will process tasks as long as there are available, unblocked tasks in the backlog.


In [6]:
# Function to simulate the scrum process
def simulate_scrum():
    create_sample_backlog()

    # Create a stop event to signal when to stop the workers
    project_end = threading.Event()

    # Start worker threads
    workers = [
        "Alice  ",
        "Bob    ",
        "Charlie",
        "David  ",
        "Emma   "
    ]
    threads = []
    for name in workers:
        thread = threading.Thread(target=worker, args=(name, project_end))
        threads.append(thread)
        thread.start()
    
    # Run for a fixed period of time
    time.sleep(20)  # Simulate for 20 seconds

    # Signal the workers to stop
    project_end.set()

    # Wait for all threads to finish
    for thread in threads:
        thread.join()

    print("=== Project Over, Deadline Reached ===")


In [7]:
# Run the simulation
simulate_scrum()

Worker Alice   is working on item  0 with priority  1. 
[(1, ID: 7, Priority: 1, Blocked: False), (3, ID: 4, Priority: 3, Blocked: False), (2, ID: 5, Priority: 2, Blocked: False), (4, ID: 8, Priority: 4, Blocked: False), (4, ID: 1, Priority: 4, Blocked: False), (6, ID: 2, Priority: 6, Blocked: False), (8, ID: 6, Priority: 8, Blocked: False), (9, ID: 3, Priority: 9, Blocked: False), (6, ID: 9, Priority: 6, Blocked: False)]
Worker Bob     is working on item  7 with priority  1. 
[(2, ID: 5, Priority: 2, Blocked: False), (3, ID: 4, Priority: 3, Blocked: False), (6, ID: 2, Priority: 6, Blocked: False), (4, ID: 8, Priority: 4, Blocked: False), (4, ID: 1, Priority: 4, Blocked: False), (6, ID: 9, Priority: 6, Blocked: False), (8, ID: 6, Priority: 8, Blocked: False), (9, ID: 3, Priority: 9, Blocked: False)]
Worker Charlie is working on item  5 with priority  2. 
[(3, ID: 4, Priority: 3, Blocked: False), (4, ID: 1, Priority: 4, Blocked: False), (6, ID: 2, Priority: 6, Blocked: False), (4, ID: 8