# Implementation: Collect Process Data


In [1]:
# 📌 Step 1: Install dependencies (run this first if needed)
!pip install psutil pandas scikit-learn joblib




In [2]:
# 📌 Step 2: Import Libraries
import psutil
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
import joblib
from queue import Queue
import time
import threading
import os


In [3]:
# 📌 Step 3: Collect System Process Data
def get_process_data():
    process_list = []
    
    for proc in psutil.process_iter(['pid', 'cpu_times', 'memory_info', 'nice']):
        try:
            info = proc.info
            process_list.append({
                "pid": info['pid'],
                "utime": info['cpu_times'].user if info['cpu_times'] else 0,  # User CPU time
                "stime": info['cpu_times'].system if info['cpu_times'] else 0,  # System CPU time
                "priority": info['nice'],  # Process priority
                "mem_usage": info['memory_info'].rss if info['memory_info'] else 0  # Memory usage
            })
        except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
            continue
    
    return pd.DataFrame(process_list)

# Save Data
df = get_process_data()
df.to_csv("windows_process_data.csv", index=False)
df.head()


Unnamed: 0,pid,utime,stime,priority,mem_usage
0,0,0.0,0.0,0,0
1,1,0.0,0.0,0,0
2,505,0.0,0.0,0,0
3,506,0.0,0.0,0,0
4,507,0.0,0.0,0,0


In [4]:
# 📌 Step 4: Train an AI Model for Process Classification
df["process_type"] = df["utime"] > df["stime"]  # Label: True = CPU-bound, False = I/O-bound

X = df[["utime", "stime", "priority", "mem_usage"]]
y = df["process_type"]

# Train-Test Split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

# Train Model
model = RandomForestClassifier(n_estimators=100)
model.fit(X_train, y_train)

# Save Model
joblib.dump(model, "process_classifier.pkl")

print("✅ Model trained and saved as process_classifier.pkl")


✅ Model trained and saved as process_classifier.pkl


In [21]:
import csv

# Initialize CSV file for RL metrics logging
with open("rl_metrics_log.csv", "w", newline="") as file:
    writer = csv.writer(file)
    writer.writerow(["Time", "Total Actions", "PROCESS_CPU", "PROCESS_IO", 
                     "Total Reward", "Avg Reward per Step", "CPU Queue Size", 
                     "IO Queue Size", "Avg Processing Latency", "Completed CPU", "Completed IO"])

def log_rl_metrics():
    """ Periodically logs RL agent metrics into a CSV file """
    while running:
        print("logging metrics")
        if rl_metrics["decision_steps"] > 0:
            avg_reward = rl_metrics["total_reward"] / rl_metrics["decision_steps"]
            avg_latency = sum(rl_metrics["avg_processing_latency"]) / len(rl_metrics["avg_processing_latency"]) if rl_metrics["avg_processing_latency"] else 0
        else:
            avg_reward = 0
            avg_latency = 0

        # Log metrics to CSV
        with open("rl_metrics_log.csv", "a", newline="") as file:
            writer = csv.writer(file)
            writer.writerow([
                time.strftime("%H:%M:%S"), rl_metrics["decision_steps"], 
                rl_metrics["action_counts"]["PROCESS_CPU"], 
                rl_metrics["action_counts"]["PROCESS_IO"],
                rl_metrics["total_reward"], avg_reward, 
                cpu_queue.qsize(), io_queue.qsize(), avg_latency,
                rl_metrics["completed_tasks"]["CPU"], 
                rl_metrics["completed_tasks"]["IO"]
            ])

        time.sleep(5)  # Log every 5 seconds

# Start RL metrics logging in a separate thread
log_metrics_thread = threading.Thread(target=log_rl_metrics, daemon=True)
log_metrics_thread.start()


## main

In [29]:
import threading
import time
import csv
import psutil
import pandas as pd
import joblib
from queue import Queue

# Load trained ML model
model = joblib.load("process_classifier.pkl")

# Define Queues
cpu_queue = Queue()
io_queue = Queue()
processed_pids = set()  # Track processed PIDs
running = True  # Global flag to control execution

# RL-related variables
state_space = ['CPU_QUEUE_SIZE', 'IO_QUEUE_SIZE']
action_space = ['PROCESS_CPU', 'PROCESS_IO']
count = 0  # Counter for RL decision-making
last_action = None
# RL Metrics Dictionary
rl_metrics = {
    "decision_steps": 0,
    "total_reward": 0,
    "action_counts": {"PROCESS_CPU": 0, "PROCESS_IO": 0},
    "completed_tasks": {"CPU": 0, "IO": 0}
}

# Placeholder function for RL agent's decision-making
def rl_decision(state):
    global count
    if count == 3:
        count = 0
        return 'PROCESS_IO'
    elif state['CPU_QUEUE_SIZE'] > state['IO_QUEUE_SIZE']:
        count += 1
        return 'PROCESS_CPU'
    else:
        return 'PROCESS_IO'

# Classify and enqueue processes
def classify_and_enqueue():
    global processed_pids, running
    while running:
        for proc in psutil.process_iter(['pid']):
            if not running:
                break
            try:
                pid = proc.info['pid']
                if pid in processed_pids:
                    continue

                # Collect process data
                process_data = pd.DataFrame([{
                    "utime": proc.cpu_times().user if proc.cpu_times() else 0,
                    "stime": proc.cpu_times().system if proc.cpu_times() else 0,
                    "priority": proc.nice(),
                    "mem_usage": proc.memory_info().rss if proc.memory_info() else 0
                }])

                # Predict process type (CPU-bound or I/O-bound)
                prediction = model.predict(process_data)[0]

                # Enqueue based on type
                if prediction:
                    cpu_queue.put(pid)
                else:
                    io_queue.put(pid)

                processed_pids.add(pid)  # Mark as processed
            except (psutil.NoSuchProcess, psutil.AccessDenied):
                continue
        time.sleep(5)

# Function to process queues
def process_queue(q, queue_type, execution_time=2):
    global processed_pids, running
    while running:
        if not q.empty():
            pid = q.get()
            if not psutil.pid_exists(pid):
                print(f"Skipping {queue_type} task (PID {pid} no longer exists)")
                continue

            print(f"Processing {queue_type} task: PID {pid}")
            time.sleep(execution_time)
            processed_pids.discard(pid)
            q.task_done()
            print(f"Completed {queue_type} task: PID {pid}")
        else:
            time.sleep(1)

# Reward function
def get_reward(state, last_action):
    reward = 0

    # Encourage CPU throughput if CPU queue is very large
    if last_action == "PROCESS_CPU":
        reward += 10  # Base reward for processing CPU task
        if state['CPU_QUEUE_SIZE'] > 100:  
            reward += 10  # Extra boost for reducing a large CPU queue
        if state['CPU_QUEUE_SIZE'] > 500:  
            reward += 20  # Heavy boost if CPU queue is critically large (urgent)
        if state['IO_QUEUE_SIZE'] < 20:  
            reward += 5  # Small bonus for keeping I/O latency low
    
    # Prioritize I/O tasks to reduce latency
    if last_action == "PROCESS_IO":
        reward += 15  # High reward for processing I/O task
        if state['IO_QUEUE_SIZE'] > 20:
            reward += 10  # Extra boost if I/O queue is large
        if state['IO_QUEUE_SIZE'] > 50:
            reward += 20  # Critical boost if I/O queue is overloaded

    # Reduce harsh penalties for high CPU queue (allow more processing)
    if state['CPU_QUEUE_SIZE'] > 500:
        reward -= 5  # Small penalty instead of harsh reduction
    
    # Still penalize excessive I/O backlog (since latency matters more)
    if state['IO_QUEUE_SIZE'] > 50:
        reward -= 15  # Stronger penalty for large I/O queue

    return reward

# RL Scheduling and Queue Management
def schedule_and_manage_queues():
    global running
    global last_action
    while running:
        state = {
            'CPU_QUEUE_SIZE': cpu_queue.qsize(),
            'IO_QUEUE_SIZE': io_queue.qsize()
        }

        action = rl_decision(state)
        
        if action == 'PROCESS_CPU' and not cpu_queue.empty():
            pid = cpu_queue.get()
            print(f"RL Action: Processing CPU task with PID {pid}")
            time.sleep(2)
            cpu_queue.task_done()
            reward = get_reward(state,last_action)
            rl_metrics["total_reward"] += reward
            rl_metrics["decision_steps"] += 1
            rl_metrics["action_counts"]["PROCESS_CPU"] += 1
            rl_metrics["completed_tasks"]["CPU"] += 1
            last_action = 'PROCESS_CPU'

        elif action == 'PROCESS_IO' and not io_queue.empty():
            pid = io_queue.get()
            print(f"RL Action: Processing I/O task with PID {pid}")
            time.sleep(1)
            io_queue.task_done()
            reward = get_reward(state,last_action)
            rl_metrics["total_reward"] += reward
            rl_metrics["decision_steps"] += 1
            rl_metrics["action_counts"]["PROCESS_IO"] += 1
            rl_metrics["completed_tasks"]["IO"] += 1
            last_action = 'PROCESS_IO'

        time.sleep(1)

# ✅ Integrated Log Metrics Function
def log_rl_metrics():
    """ Periodically logs RL agent metrics into a CSV file """
    with open("rl_metrics_log.csv", "a", newline="") as file:
        writer = csv.writer(file)
        writer.writerow(["Time", "Steps", "PROCESS_CPU", "PROCESS_IO", "Total Reward", "Avg Reward", "CPU Queue", "IO Queue"])

    while running:
        print("Logging metrics...")  # Debugging statement

        if rl_metrics["decision_steps"] > 0:
            avg_reward = rl_metrics["total_reward"] / rl_metrics["decision_steps"]
        else:
            avg_reward = 0

        with open("rl_metrics_log.csv", "a", newline="") as file:
            writer = csv.writer(file)
            writer.writerow([
                time.strftime("%H:%M:%S"), rl_metrics["decision_steps"], 
                rl_metrics["action_counts"]["PROCESS_CPU"], 
                rl_metrics["action_counts"]["PROCESS_IO"],
                rl_metrics["total_reward"], avg_reward,
                cpu_queue.qsize(), io_queue.qsize()
            ])
            file.flush()  # Force write to disk

        time.sleep(5)  # Log every 5 seconds

# ✅ Start Worker Threads
cpu_worker = threading.Thread(target=process_queue, args=(cpu_queue, "CPU-bound", 2), daemon=True)
io_worker = threading.Thread(target=process_queue, args=(io_queue, "I/O-bound", 1), daemon=True)
classifier_thread = threading.Thread(target=classify_and_enqueue, daemon=True)
scheduler_thread = threading.Thread(target=schedule_and_manage_queues, daemon=True)
log_metrics_thread = threading.Thread(target=log_rl_metrics, daemon=True)  # Logging thread

# Start all threads
cpu_worker.start()
io_worker.start()
classifier_thread.start()
scheduler_thread.start()
log_metrics_thread.start()

# Run for a fixed time or until user stops
try:
    while True:
        print(f"CPU Queue Size: {cpu_queue.qsize()} | IO Queue Size: {io_queue.qsize()}")
        time.sleep(5)
except KeyboardInterrupt:
    print("\nStopping program...")
    running = False  # Stop all threads
    classifier_thread.join()
    cpu_worker.join()
    io_worker.join()
    scheduler_thread.join()
    log_metrics_thread.join()
    print("All threads stopped.")


RL Action: Processing CPU task with PID 577
CPU Queue Size: 6 | IO Queue Size: 0
Logging metrics...
Processing CPU-bound task: PID 804Processing I/O-bound task: PID 824

Completed I/O-bound task: PID 824
Processing I/O-bound task: PID 835
Completed CPU-bound task: PID 804
Processing CPU-bound task: PID 805
Completed I/O-bound task: PID 835
Processing I/O-bound task: PID 843
RL Action: Processing CPU task with PID 810
Completed I/O-bound task: PID 843
Processing I/O-bound task: PID 844
Completed CPU-bound task: PID 805
Processing CPU-bound task: PID 811
Completed I/O-bound task: PID 844
Processing I/O-bound task: PID 857
CPU Queue Size: 443 | IO Queue Size: 56
Logging metrics...
Completed I/O-bound task: PID 857
Processing I/O-bound task: PID 861
RL Action: Processing CPU task with PID 814
Completed CPU-bound task: PID 811
Processing CPU-bound task: PID 816
Completed I/O-bound task: PID 861
Processing I/O-bound task: PID 877
Completed I/O-bound task: PID 877
Processing I/O-bound task: P

## Perform metrics 


In [None]:
import psutil
import pandas as pd

def get_windows_process_data():
    """ Collects process data from Windows for evaluation. """
    process_list = []
    
    for proc in psutil.process_iter(attrs=['pid', 'cpu_times', 'memory_info', 'nice']):
        try:
            info = proc.info
            process_list.append({
                "pid": info['pid'],
                "utime": info['cpu_times'].user if info['cpu_times'] else 0,  # User mode CPU time
                "stime": info['cpu_times'].system if info['cpu_times'] else 0,  # Kernel mode CPU time
                "priority": info['nice'],  # Process priority
                "mem_usage": info['memory_info'].rss if info['memory_info'] else 0  # Memory usage in bytes
            })
        except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
            continue

    return pd.DataFrame(process_list)

# Get real-time Windows process data
df_test = get_windows_process_data()
df_test.to_csv("windows_test_data2.csv", index=False)  # Save for reuse


In [None]:
import joblib
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix

# Load trained classifier
model = joblib.load("process_classifier.pkl")

# Load test data
df_test = pd.read_csv("windows_process_data.csv")

# Drop the 'pid' column (since it's not a feature)
X_test = df_test.drop(columns=["pid"])



In [None]:
y_pred = model.predict(X_test)
df_test["predicted_class"] = y_pred
print(df_test.head())  # Show some predictions


In [None]:
# Generate pseudo ground truth (approximate classification)
df_test["true_label"] = df_test.apply(lambda row: 1 if row["utime"] + row["stime"] > 0.5 else 0, axis=1)

# Compare model vs pseudo ground truth
y_test = df_test["true_label"]

print("Classification Report:")
print(classification_report(y_test, y_pred))

print("Confusion Matrix:")
print(confusion_matrix(y_test, y_pred))


In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

cm = confusion_matrix(y_test, y_pred)

plt.figure(figsize=(6, 4))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=["I/O-Bound", "CPU-Bound"], yticklabels=["I/O-Bound", "CPU-Bound"])
plt.xlabel("Predicted")
plt.ylabel("Actual")
plt.title("Confusion Matrix (Windows Process Classification)")
plt.show()


In [None]:
import matplotlib.pyplot as plt
import pandas as pd
import time

def plot_rl_metrics():
    """ Live visualization of RL metrics """
    plt.ion()  # Interactive mode ON
    fig, ax = plt.subplots(3, 1, figsize=(10, 10))

    while True:
        try:
            # Read latest RL metrics
            df = pd.read_csv("rl_metrics_log.csv")

            if df.empty:
                continue  # Wait until some data is logged

            # Extract values for plotting
            time_labels = df["Time"]
            actions_cpu = df["PROCESS_CPU"]
            actions_io = df["PROCESS_IO"]
            avg_reward = df["Avg Reward per Step"]
            queue_balance = df["CPU Queue Size"] / (df["IO Queue Size"] + 1)  # Avoid division by zero

            # Clear previous plots
            ax[0].clear()
            ax[1].clear()
            ax[2].clear()

            # Plot action selection over time
            ax[0].plot(time_labels, actions_cpu, label="PROCESS_CPU", color="blue")
            ax[0].plot(time_labels, actions_io, label="PROCESS_IO", color="red")
            ax[0].set_title("Action Distribution Over Time")
            ax[0].set_ylabel("Action Count")
            ax[0].legend()

            # Plot reward progression
            ax[1].plot(time_labels, avg_reward, label="Avg Reward per Step", color="green")
            ax[1].set_title("Average Reward Over Time")
            ax[1].set_ylabel("Avg Reward")
            ax[1].legend()

            # Plot queue balance trend
            ax[2].plot(time_labels, queue_balance, label="CPU:IO Queue Ratio", color="purple")
            ax[2].set_title("Queue Balance Over Time")
            ax[2].set_ylabel("CPU/IO Ratio")
            ax[2].legend()

            # Improve layout
            plt.xticks(rotation=45)
            plt.tight_layout()
            plt.pause(5)  # Update plot every 5 seconds

        except Exception as e:
            print(f"Error in plotting: {e}")
        time.sleep(5)  # Refresh interval

# Run this in a separate Jupyter Notebook cell
plot_rl_metrics()


Error in plotting: 'Avg Reward per Step'
Error in plotting: 'Avg Reward per Step'
Error in plotting: 'Avg Reward per Step'
Error in plotting: 'Avg Reward per Step'
Error in plotting: 'Avg Reward per Step'
