**About this project:**


---


This project simulates an intelligent task management system inspired by AI-based productivity tools like Microsoft 365 Copilot.
It uses priority queues, multi-threading, and automatic retries to process tasks in real-time.
The system dynamically prioritizes tasks, handles failures smartly, logs activity, and displays a live dashboard — replicating how an AI assistant might manage heavy workloads efficiently.

**What inspired me for this project:**


---


I wanted to explore how queues are used in real-world applications. Creating a smart task manager felt like a fun and practical way to learn while building a cool mini-project!

**Key Learning in this project:**


---


1. Queue Implementation
2. Multi-threading and concurrency
3. Task Prioritization
4. Auto-retry logic
5. Logging and event tracking
6. Real-time dashboards
7. Error handling and robustness
8. Python Standard libraries(threading, queue, random, os, time)
9. Clean professional project structure
10. Basic AI simulation(task management behavior)

# **Step 1: Importing Libraries**

In this step we will export all the required set of libraries we will need for this project. It's a good practice to import whatever comes to your mind first which carrries relevance to project.

In [62]:
import time #task execution time and manage delays between retries
import threading #manage concurrent task execution in multiple threads
from queue import PriorityQueue, Full #priority queue to manage task based on thier priority levels
import os #for handling file operations for logging and task management
import random #random task failures and delays for retry logic
from datetime import datetime

# **Step 2: Setting and defining priority**

In [63]:
#these setting are being defined for simulation
max_queue_size= 5
processing_time= 4 #how many seconds per task
retry_wait_time= 3 #second before retry operation should happen
failure_rate= 0.2 #20% chance that the processing fails
log_file= 'logs.txt' #file where logs will be stored


# Define Priority:
PRIORITY = {
    "high": 1,
    "medium": 2,
    "low": 3
}

# **Step 3: Defining Incoming requests and priority**

In [64]:
#defining incoming requests like an end-user would provide to AI assistant
incoming_requests= [ ("Summarize my document", "medium"),
    ("Find meeting times", "low"),
    ("Generate project update email", "high"),
    ("Translate to French", "medium"),
    ("Suggest report improvements", "low"),
    ("Summarize today's meeting", "medium"),
    ("Write LinkedIn post", "high"),
    ("Create PowerPoint summary", "medium"),
    ("Draft client follow-up email", "high"),
    ("Outline marketing plan", "low")
]

#defining the priority queue
user_requests= PriorityQueue(maxsize=max_queue_size)


# **Step 4: Creating the utility functions**

In [65]:
#utility: clear console
def clear_console():
  os.system('cls' if os.name == 'nt' else 'clear') #basically it is executing clear operation

  #if OS is Windows(nt) the use cls
  #else if linux/max OS use clear

#utility: write logs
def log_event(event):
  with open(log_file, 'a') as file:
    file.write(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - {event}\n")


# **Step 5: Function to retrieve requests**

In [66]:
def receive_requests():
  for req, priority in incoming_requests:
    while True:
      try:
        user_requests.put((PRIORITY[priority], req), timeout=2) #this puts the request in user request with a timeout of 2 seconds ensuring request are put with correct priority
        print(f"\033[92m[🟢 Received] {req} (Priority: {priority})\033[0m)") #this indicates request has been received using ANSI escape codes to color the text green
        log_event(f"Received : {req} (Priority: {priority})") #logs the event, recording what was the request and it's priority
        break

      #what will happen when queue is full
      except Full:
        print(f"\033[91m[🚨 Busy] Queue Full Retrying : {req} after {retry_wait_time} seconds...... \033[0m") #prints a message to console that queue is full and prints after how many seconds retry oepration will happen
        time.sleep(retry_wait_time) #pauses the execution for this period of time

    time.sleep(0.5) #pauses the execution for 0.5 seconds before moving to next request


# **Step 6: Function to process requests**

In [67]:
def process_requests():
  while True:
    if not user_requests.empty():
      priority, req= user_requests.get() #retrieve the user request

      print(f"\033[94m[⚙️ Processing] {req} \033[0m") #printing message that the request is being processed with ANSI code ser to text blue

      success=simulate_processing(req) #function call to process request and returns whether it was successfull

      if success:
        print(f"\033[92m [✅ Completed] {req}\n\033[0m") #indicate request was successfully completed
        log_event(f"Completed: {req}")

      else:
        print(f"\033[96m [⚠️ Failed] {req} - Retrying...\n\033[0m") #indicate request failed and system is in process of retry operation
        log_event(f"Failed: {req} - Retrying...")
        retry_task(priority, req) #call to retry the task

    else:
      time.sleep(1) #Pause the execution for 1 second before checking the queue again

# **Step 7: Function to simulate processing a task**

In [68]:
def simulate_processing(req):
  for i in range(processing_time):
    display_queue_status(req, i+1)

    time.sleep(1)

  #simulating a random failure
  if random.random() < failure_rate:
    return False

  return True

# **Step 8: Function for retry mechanism**

In [69]:
def retry_task(priority, req):
  while True:
    try:
      user_requests.put((priority, req), timeout=2)

      break

    except Full:
      print(f"\033[91m [🚨 Busy] Retrying to requeue: {req} after {retry_wait_time} seconds... \033[0m")
      time.sleep(retry_wait_time)

**Step 9: Function to display the dashboard**

In [70]:
def display_queue_status(current_task, elapsed_time):
  queue_size= user_requests.qsize()

  progress= "#" * elapsed_time + "-" * (processing_time - elapsed_time)

  clear_console()

  print("========================AI Manager dashboard==============================")
  print(f"🔵 Current Task: {current_task}")
  print(f"⏳ Progress: [{progress}] ({elapsed_time}/{processing_time} seconds)")
  print(f"📥 Pending Tasks: {queue_size}\n")

  temp_list= list(user_requests.queue)
  print("Tasks in queue:")
  for idx, (prio, task) in enumerate(temp_list, 1):
    print(f" {idx}, {task}  (Priority {prio})")

  print("===========================================\n")





# **Step 10: Main function**

In [71]:
if __name__=="__main__":
  #clearing the old logs
  open(log_file, 'w').close()

  #creating threads
  receiver_thread= threading.Thread(target=receive_requests)
  processor_thread= threading.Thread(target=process_requests, daemon= True)

  #starting threads
  receiver_thread.start()
  processor_thread.start()

  #joining threads
  receiver_thread.join

  time.sleep(30) #enough time to complete all

[92m[🟢 Received] Summarize my document (Priority: medium)[0m)
[94m[⚙️ Processing] Summarize my document [0m
🔵 Current Task: Summarize my document
⏳ Progress: [#---] (1/4 seconds)
📥 Pending Tasks: 0

Tasks in queue:

[92m[🟢 Received] Find meeting times (Priority: low)[0m)
[94m [⚙️ Processing] Find meeting times [0m
🔵 Current Task: Find meeting times
⏳ Progress: [#---] (1/4 seconds)
📥 Pending Tasks: 0

Tasks in queue:

[92m[🟢 Received] Generate project update email (Priority: high)[0m)
🔵 Current Task: Summarize my document
⏳ Progress: [##--] (2/4 seconds)
📥 Pending Tasks: 1

Tasks in queue:
 1, Generate project update email  (Priority 1)

[94m[⚙️ Processing] Generate project update email [0m
🔵 Current Task: Generate project update email
⏳ Progress: [#---] (1/4 seconds)
📥 Pending Tasks: 0

Tasks in queue:

[92m[🟢 Received] Translate to French (Priority: medium)[0m)
🔵 Current Task: Find meeting times
⏳ Progress: [##--] (2/4 seconds)
📥 Pending Tasks: 1

Tasks in queue:
 1, Tra