<a href="https://colab.research.google.com/github/TamaraEstrada/cmpsc472-project1/blob/main/CMPSC_472_Multi_Process_Threads_Manager_with_IPC_and_Parallel_Text_File_Processing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Aid used:**

https://www.florianreinhard.de/accessdenied-in-psutil/


https://stackoverflow.com/questions/74420568/error-psutil-nosuchprocess-process-no-longer-exists-pid-23100


https://www.youtube.com/watch?v=PJ4t2U15ACo



https://www.codecademy.com/resources/docs/python/threading/thread


https://psutil.readthedocs.io/en/latest/

https://martinxpn.medium.com/thread-pools-and-process-pools-in-python-75-100-days-of-python-4101f10f64fc


https://superfastpython.com/multiprocessing-pipe-in-python/


https://www.geeksforgeeks.org/python-communicating-between-threads-set-1/


https://pythonforthelab.com/blog/handling-and-sharing-data-between-threads/

In [1]:
# Multi-process and multi-thread task manager.
# User can view system resource utilitzation, display detailed information and processes
# and threads, and perfrom action like killing, suspending/resuming processes and threads

import psutil # Fetch info about system processes and threads
import threading # Create and manage threads
import time # Timing-related operation
import os # Interact w/ OS
import multiprocessing
import queue

# Manages Processes
class ProcessManager:
    def __init__(self):
        self.processes = {} # Empty dictionary to store info about processes

# Fetches info abt all running processes and threads
    def refresh_processes(self):
        self.processes = {} # Ensure each process is reset to an empty dictionary
        # Iteratre over all running processes
        for process in psutil.process_iter(['pid', 'name', 'status']):
            self.processes[process.pid] = {
                'name': process.info['name'],
                'status': process.info['status'],
                'threads': [],
                'num_threads': process.num_threads()
            }
            # Error handling -- Information retrieval
            # Tries to feed info about threads for each of the current process
            try:
                for thread in process.threads():
                    self.processes[process.pid]['threads'].append(thread.id)
            # If access is denied it will ignore
            except psutil.AccessDenied:
                pass

# Displays detailed information about all running processes and threads
    def display_processes(self):
        print("Processes:")
        for pid, descrip in self.processes.items():
            print(f"PID: {pid}, Name: {descrip['name']}, Status: {descrip['status']}, Threads: {descrip['threads']}, Num Threads: {descrip['num_threads']}")

# Will terminate a process
    def kill_process(self, pid):
        try:
            # Will terminate process given pid
            process = psutil.Process(pid) # retrieves process
            process.terminate()
            print(f"PID: {pid} terminated.")
        # If the process does not exist -> prints error
        except psutil.NoSuchProcess:
            print(f"No process found with PID: {pid}.")

# Will suspend a process
    def suspend_process(self, pid):
        try:
            process = psutil.Process(pid) # retrieves process
            process.suspend()
            print(f"PID: {pid} suspended.")
        # If process doesnt exist -> prints error
        except psutil.NoSuchProcess:
            print(f"No process found with PID: {pid}.")

# Will resume a process
    def resume_process(self, pid):
        try:
            process = psutil.Process(pid) # retrives process
            process.resume()
            print(f"PID: {pid} resumed.")
        # If process doesnt exist -> prints error
        except psutil.NoSuchProcess:
            print(f"No process found with PID: {pid}.")

    def simulate_ipc(self):
      # Creates a shared memory with a single integer value[0]
        shared_mem = multiprocessing.Array('i', [0])
      # Creates a multiprocessing queue so that processes can communicate
        messages = multiprocessing.Queue()

      # Creates a new process
        process = multiprocessing.Process(target=self.process_task, args=(shared_mem, messages))
        process.start()
        process.join()

        print("Shared data in processes: ", shared_mem[:])
        print("Process message queue: ")
        while not messages.empty():
            print(messages.get())

      # Function that actually performs the task
    def process_task(self, shared_mem, messages):
        for i in range(10):
            shared_mem[0] += 1
            messages.put(i)

# Helps the output by refreshing and continuously display process information with a delay of 5sec
    def run(self):
        while True: #inf loop
            self.refresh_processes() # Updates process info
            self.display_processes() # Outputs process info
            time.sleep(5)


In [2]:
# Is responsable for managing threads
class ThreadManager:
    def __init__(self):
        self.threads = [] # Empty list to store active threads

# Prints info about all active threads
    def display_threads(self):
        print("Threads:")
        for thread in threading.enumerate():
            print(f"Thread ID: {thread.ident}, Name: {thread.name}, Daemon: {thread.daemon}, Alive: {thread.is_alive()}")

# Will terminate a thread
    def kill_thread(self, thread_id):
        for thread in threading.enumerate(): # All currently active threads
            # Checks if threadID mathces user proviced ID
            if thread.ident == thread_id:
                thread.join() # Waits for the thread to complete execution, terminates it
                print(f"Thread with ID {thread_id} terminated successfully.")
                return
        # If thread doesnt exist -> prints error
        print(f"No thread found with ID {thread_id}.")

# Will create a thread
    def create_thread(self, target, args=(), daemon=False):
        # Creates a new thread with paramters from user
        thread = threading.Thread(target=target, args=args, daemon=daemon)
        self.threads.append(thread) # Adds to thread list
        thread.start() # Starts the thread
        print(f"Thread {thread.name} with ID {thread.ident} created.")

# Same as process logic just with threads
    def simulate_ipc(self):
        shared_data = multiprocessing.Array('i', [0])
        message_queue = queue.Queue()

        thread = threading.Thread(target=self.thread_task, args=(shared_data, message_queue))
        thread.start()
        thread.join()

        print("Thread shared data:", shared_data[:])
        print("Thread message queue contents:")
        while not message_queue.empty():
            print(message_queue.get())

    def thread_task(self, shared_data, message_queue):
        for i in range(10):
            with shared_data.get_lock():
                shared_data[0] += 1
            message_queue.put(i)

# Helps the output by refreshing and continuously display process information with a delay of 5sec
    def run(self):
      self.running = True
      while self.running:
          self.display_processes()
          time.sleep(5)

    # To stop the loop from outside
    def stop(self):
        self.running = False


In [3]:

# Process for sending short messges
def sender_short(pipe, receiver_pid, start_time):
    # Gets the PID of the current process(sender)
    sender_pid = os.getpid()
    # List of short messages
    messages = ["Short message.", "Yah.", "Last message."]
    for msg in messages:
        # Each message is sent through the pipe as a tuple
        pipe.send((sender_pid, receiver_pid, msg))
        print(f"Sent from PID {sender_pid} to PID {receiver_pid}: {msg}")
        end_time = time.time()
        print(f"Time taken by sender_short: {end_time - start_time:.2f} seconds")
        time.sleep(1) # Just to wait for one second

# Process for sending long messages, similar to short message function
def sender_long(pipe, receiver_pid, start_time):
    sender_pid = os.getpid()
    messages = ["Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed gravida aliquam lectus, a dignissim neque pharetra vel. Nulla rutrum id lorem at pharetra. Nulla eget nunc ipsum. Proin a finibus leo, nec pharetra lacus. Vivamus tempor ac erat vel ultrices. Donec turpis quam, pretium ac felis in, bibendum porta lacus. Integer elementum vel odio sit amet mattis.",
                "Orci varius natoque penatibus et magnis dis parturient montes, nascetur ridiculus mus. Etiam tincidunt dolor tristique, auctor velit vel, volutpat velit. Morbi eleifend nisi a tortor tristique cursus. Ut consectetur vestibulum risus, et cursus nulla pretium non.",
                "Pellentesque facilisis viverra justo, sed vehicula massa sollicitudin eu. Maecenas fringilla non ante feugiat congue. Maecenas cursus faucibus urna, et eleifend purus scelerisque at. Vivamus interdum felis id augue congue interdum. Donec quis justo iaculis, tincidunt ipsum id, maximus ex. Donec fermentum sit amet dolor id tempus."]
    for msg in messages:
        pipe.send((sender_pid, receiver_pid, msg))
        print(f"Sent from PID {sender_pid} to PID {receiver_pid}: {msg}")
        end_time = time.time()
        print(f"Time taken by sender_long: {end_time - start_time:.2f} seconds")
        time.sleep(2)

# Recieves the process
def receiver(pipe, start_time):
    while True:
        # Once recived it unpacks the senderPID, recieverPID, and message
        sender_pid, receiver_pid, msg = pipe.recv()
        print(f"Received from PID {sender_pid} to PID {receiver_pid}: {msg}")
        end_time = time.time()
        print(f"Time taken to receive: {end_time - start_time:.2f} seconds")

In [4]:
# For sending messages with threads
def sender_short(queue, receiver_pid, start_time):
    # Gets the identifier of the current thread
    messages = ["Short message.", "Yah.", "Last message."]
    sender_pid = threading.get_ident()
    for msg in messages:
      # The senderID, receiverID, and message are loaded into the queue as a tuple
        queue.put((sender_pid, receiver_pid, msg))
        print(f"Sent from Thread {sender_pid} to PID {receiver_pid}: {msg}")
        end_time = time.mtime()
        print(f"Time taken by sender_short: {end_time - start_time:.2f} seconds")
        time.sleep(1)

# Same as sender_short just with long messages
def sender_long(queue, receiver_pid, start_time):
    sender_pid = threading.get_ident()
    messages = ["Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed gravida aliquam lectus, a dignissim neque pharetra vel. Nulla rutrum id lorem at pharetra. Nulla eget nunc ipsum. Proin a finibus leo, nec pharetra lacus. Vivamus tempor ac erat vel ultrices. Donec turpis quam, pretium ac felis in, bibendum porta lacus. Integer elementum vel odio sit amet mattis.",
                "Orci varius natoque penatibus et magnis dis parturient montes, nascetur ridiculus mus. Etiam tincidunt dolor tristique, auctor velit vel, volutpat velit. Morbi eleifend nisi a tortor tristique cursus. Ut consectetur vestibulum risus, et cursus nulla pretium non.",
                "Pellentesque facilisis viverra justo, sed vehicula massa sollicitudin eu. Maecenas fringilla non ante feugiat congue. Maecenas cursus faucibus urna, et eleifend purus scelerisque at. Vivamus interdum felis id augue congue interdum. Donec quis justo iaculis, tincidunt ipsum id, maximus ex. Donec fermentum sit amet dolor id tempus."]
    for msg in messages:
        queue.put((sender_pid, receiver_pid, msg))
        print(f"Sent from Thread {sender_pid} to PID {receiver_pid}: {msg}")
        end_time = time.time()
        print(f"Time taken by sender_long: {end_time - start_time:.2f} seconds")
        time.sleep(2)

def receiver(queue, start_time):
    while True:
        # Retrieves the senderID, recieverID, and the message from the queue
        sender_pid, receiver_pid, msg = queue.get()
        print(f"Received from Thread {sender_pid} to PID {receiver_pid}: {msg}")
        end_time = time.time()
        print(f"Time taken to receive: {end_time - start_time:.2f} seconds")

In [None]:
# Takes a block of text as input and counts occurrences of alphabetic characters
def process_chunk(chunk):
    upper_chars = chunk.upper()  # converts to upper case
    char_count = {}
    # Iterate through each character and updating the count everytime its seen
    for char in upper_chars:
        if char.isalpha():
            if char in char_count:
                char_count[char] += 1
            else:
                char_count[char] = 1
    return char_count # return final character count

# Processes the text file in parallel
def parallel_file_processing(file_path, chunk_size=1024):
    # Determines the number of CPU cores available
    num_processes = multiprocessing.cpu_count()
    # Creates a pool of processes
    pool = multiprocessing.Pool(processes=num_processes)
    # Opens file
    with open(file_path, 'r') as file:
        results = []
        while True:
            chunk = file.read(chunk_size)
            if not chunk:
                break
            # Submitts each 'chunk' asynchronously
            results.append(pool.apply_async(process_chunk, (chunk,)))

        pool.close()
        pool.join()

        # Combining results into one dictionary
        final_char_count = {}
        for result in results:
            chunk_char_count = result.get()
            for char, count in chunk_char_count.items():
                if char in final_char_count:
                    final_char_count[char] += count
                else:
                    final_char_count[char] = count

        return final_char_count


In [None]:
def main():
    process_manager = ProcessManager()
    thread_manager = ThreadManager()
    parent_conn, child_conn = multiprocessing.Pipe()
    message_queue = queue.Queue()  # Shared queue for thread message passing

    while True:
        choice = main_menu()

        if choice == '1':
            process_manager.refresh_processes()
            process_manager.display_processes()

        elif choice == '2':
            thread_manager.display_threads()

        elif choice == '3':
            pid = int(input("Enter the PID of the process to kill: "))

            process_manager.kill_process(pid)
        elif choice == '4':
            pid = int(input("Enter the PID of the process to suspend: "))
            process_manager.suspend_process(pid)

        elif choice == '5':
            pid = int(input("Enter the PID of the process to resume: "))
            process_manager.resume_process(pid)

        elif choice == '6':
            process_manager.simulate_ipc()

        elif choice == '7':
            thread_manager.simulate_ipc()

        elif choice == '8':
            start_time = time.time()
            receiver_process = multiprocessing.Process(target=receiver, args=(child_conn, start_time))
            receiver_process.start()
            receiver_pid = receiver_process.pid
            sender_short_process = multiprocessing.Process(target=sender_short, args=(parent_conn, receiver_pid, start_time))
            sender_short_process.start()
            sender_short_process.join()
            receiver_process.terminate()
            end_time = time.time()
            print(f"Total runtime: {end_time - start_time:.2f} seconds")

        elif choice == '9':
            start_time = time.time()
            receiver_process = multiprocessing.Process(target=receiver, args=(child_conn, start_time))
            receiver_process.start()
            receiver_pid = receiver_process.pid
            sender_long_process = multiprocessing.Process(target=sender_long, args=(parent_conn, receiver_pid, start_time))
            sender_long_process.start()
            sender_long_process.join()
            receiver_process.terminate()
            end_time = time.time()
            print(f"Total runtime: {end_time - start_time:.2f} seconds")

        elif choice == '10':
          start_time = time.time()
          receiver_thread = threading.Thread(target=receiver, args=(message_queue, start_time))
          receiver_thread.start()
          receiver_pid = threading.get_ident()
          sender_short_thread = threading.Thread(target=sender_short, args=(message_queue, receiver_pid, start_time))
          sender_short_thread.start()
          sender_short_thread.join()
          receiver_thread.join()
          end_time = time.time()
          print(f"Total runtime: {end_time - start_time:.2f} seconds")

        elif choice == '11':
          start_time = time.time()
          receiver_thread = threading.Thread(target=receiver, args=(message_queue, start_time))
          receiver_thread.start()
          receiver_pid = threading.get_ident()
          sender_long_thread = threading.Thread(target=sender_long, args=(message_queue, receiver_pid, start_time))
          sender_long_thread.start()
          sender_long_thread.join()
          receiver_thread.join()
          end_time = time.time()
          print(f"Total runtime: {end_time - start_time:.2f} seconds")

        elif choice == '12':
          file_path = "/content/IPC.txt"
          char_count = parallel_file_processing(file_path)
          print("Character Counts:")
          for char, count in char_count.items():
              print(f"{char}: {count}")

        elif choice == '13':
          os._exit(0)
        else:
            print("Not a valid choice, try again.")

        time.sleep(1)

if __name__ == "__main__":
    main()


Main Menu
1. Display Processes
2. Display Threads
3. Kill a Process
4. Suspend a Process
5. Resume a Process
6. Simulate IPC over Processes
7. Simulate IPC over Threads
8. Send Short Messages (Processes)
9. Send Long Messages (Processes)
10. Send Short Messages (Threads)
11. Send Long Messages (Threads)
12. Exit
Enter your choice (1-12): 12


In [5]:
def main_menu():
    print("\nMain Menu")
    print("1. Display Processes")
    print("2. Display Threads")
    print("3. Kill a Process")
    print("4. Suspend a Process")
    print("5. Resume a Process")
    print("6. IPC for Processes")
    print("7. IPC for Threads")
    print("8. Send Short Messages for Processes")
    print("9. Send Long Messages for Processes")
    print("10. Send Short Messages for Threads")
    print("11. Send Long Messages for Threads")
    print("12. Parallel Text File Processing")
    print("13. Exit")
    choice = input("Enter your choice (1-13): ")
    return choice
