<a href="https://colab.research.google.com/github/InannaxX07/Optimizer/blob/main/Optimizer1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# First, install required packages
!pip install torch psutil numpy scikit-learn tqdm ipywidgets

import torch
import psutil
import time
import numpy as np
from collections import deque
import threading
from typing import List, Dict, Optional
import logging
from dataclasses import dataclass
from sklearn.ensemble import RandomForestRegressor
import concurrent.futures
from IPython.display import clear_output, display
from tqdm.notebook import tqdm
import ipywidgets as widgets

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)

@dataclass
class TaskMetrics:
    memory_usage: float
    gpu_utilization: float
    duration: float
    task_type: str
    model_size: str
    batch_size: Optional[int] = None

class ResourcePredictor:
    """Predicts resource usage for tasks"""
    def __init__(self):
        self.model = RandomForestRegressor()
        self.training_data = []

    def update(self, metrics: TaskMetrics):
        """Update predictor with new task metrics"""
        self.training_data.append({
            'memory_usage': metrics.memory_usage,
            'gpu_utilization': metrics.gpu_utilization,
            'duration': metrics.duration,
            'task_type': metrics.task_type,
            'model_size': metrics.model_size,
            'batch_size': metrics.batch_size or 0
        })

class ColabGPUMonitor:
    """GPU monitoring specifically for Colab environment"""
    def __init__(self):
        self.output = widgets.Output()

    def get_gpu_info(self):
        """Get GPU info using nvidia-smi"""
        try:
            gpu_info = !nvidia-smi --query-gpu=utilization.gpu,memory.used,memory.total --format=csv,noheader,nounits
            gpu_info = gpu_info[0].split(',')
            return {
                'gpu_util': float(gpu_info[0]),
                'memory_used': float(gpu_info[1]),
                'memory_total': float(gpu_info[2])
            }
        except Exception as e:
            logger.error(f"Error getting GPU info: {str(e)}")
            return {
                'gpu_util': 0,
                'memory_used': 0,
                'memory_total': 1
            }

    def display_status(self, scheduler):
        """Display current GPU and task status"""
        with self.output:
            clear_output(wait=True)
            gpu_info = self.get_gpu_info()

            print("🖥️ GPU Status:")
            print(f"├─ Utilization: {gpu_info['gpu_util']}%")
            print(f"├─ Memory Used: {gpu_info['memory_used']}/{gpu_info['memory_total']} MB")
            print(f"└─ Memory Utilization: {(gpu_info['memory_used']/gpu_info['memory_total'])*100:.1f}%\n")

            print("📋 Running Tasks:")
            if scheduler.running_tasks:
                for task_id, task in scheduler.running_tasks.items():
                    if task.get('start_time'):
                        elapsed = time.time() - task['start_time']
                        progress = min(100, (elapsed / task['expected_duration']) * 100)
                        print(f"├─ Task {task_id} ({task['task_type']}, Priority {task['priority']})")
                        print(f"│  └─ Progress: {progress:.1f}% complete")
            else:
                print("└─ No tasks currently running")

            print("\n📊 Queue Status:")
            total_queued = sum(len(q) for q in scheduler.priority_queues.values())
            if total_queued > 0:
                for priority, queue in scheduler.priority_queues.items():
                    if len(queue) > 0:
                        print(f"├─ Priority {priority}: {len(queue)} tasks waiting")
            else:
                print("└─ No tasks in queue")

class GPUTaskScheduler:
    """GPU Task Scheduler optimized for Google Colab"""

    def __init__(self, memory_threshold: float = 0.85, utilization_threshold: float = 0.80):
        self.memory_threshold = memory_threshold
        self.utilization_threshold = utilization_threshold
        self.tasks = []
        self.running_tasks = {}
        self.resource_predictor = ResourcePredictor()
        self.lock = threading.Lock()
        self.priority_queues = {i: deque() for i in range(1, 6)}
        self.stop_event = threading.Event()
        self.monitor = ColabGPUMonitor()

        # Check Colab GPU availability
        try:
            !nvidia-smi
            if not torch.cuda.is_available():
                raise RuntimeError("No GPU available. Please enable GPU in Colab.")
            self.num_gpus = torch.cuda.device_count()
            logger.info(f"Initialized scheduler with {self.num_gpus} GPU(s)")
        except Exception as e:
            logger.error(f"Error initializing GPU: {str(e)}")
            raise

        # Display the monitoring widget
        display(self.monitor.output)

    def process_completed_tasks(self):
        """Process and remove completed tasks from running tasks."""
        try:
            current_time = time.time()
            completed_tasks = []

            for task_id, task in list(self.running_tasks.items()):
                if task.get('start_time') is None:
                    continue

                elapsed_time = current_time - task['start_time']

                if elapsed_time >= task['expected_duration'] * 1.1:
                    task['completion_time'] = current_time
                    task['status'] = 'completed'
                    completed_tasks.append(task_id)

                    logger.info(f"Task {task_id} completed. Duration: {elapsed_time:.2f}s")

                    try:
                        metrics = TaskMetrics(
                            memory_usage=self.monitor.get_gpu_info()['memory_used'],
                            gpu_utilization=self.monitor.get_gpu_info()['gpu_util'],
                            duration=elapsed_time,
                            task_type=task['task_type'],
                            model_size=task['model_size'],
                            batch_size=task.get('batch_size')
                        )
                        self.resource_predictor.update(metrics)
                    except Exception as e:
                        logger.error(f"Error collecting metrics for task {task_id}: {str(e)}")

            for task_id in completed_tasks:
                self.running_tasks.pop(task_id, None)

        except Exception as e:
            logger.error(f"Error processing completed tasks: {str(e)}")

    def can_schedule_task(self, task):
        """Check if there are sufficient resources to schedule a task."""
        try:
            gpu_info = self.monitor.get_gpu_info()

            memory_usage = gpu_info['memory_used'] / gpu_info['memory_total']
            if memory_usage > self.memory_threshold:
                return False

            if gpu_info['gpu_util'] > self.utilization_threshold * 100:
                return False

            return True
        except Exception as e:
            logger.error(f"Error checking resource availability: {str(e)}")
            return False

    def schedule_queued_tasks(self):
        """Schedule queued tasks based on priority and resource availability."""
        try:
            for priority in range(5, 0, -1):
                queue = self.priority_queues[priority]

                while queue and self.can_schedule_task(queue[0]):
                    task = queue.popleft()
                    task['start_time'] = time.time()
                    task['status'] = 'running'

                    self.running_tasks[task['id']] = task
                    logger.info(f"Started task {task['id']} (Priority {priority})")

                    break

        except Exception as e:
            logger.error(f"Error scheduling tasks: {str(e)}")

    def monitor_and_optimize(self, interval: float = 1.0):
        """Monitor GPU usage with Colab-specific display"""
        while not self.stop_event.is_set():
            try:
                with self.lock:
                    self.process_completed_tasks()
                    self.schedule_queued_tasks()

                self.monitor.display_status(self)
                time.sleep(interval)

            except Exception as e:
                logger.error(f"Error in monitoring loop: {str(e)}")
                time.sleep(interval)

    def add_task(self, task: dict) -> bool:
        """Add a new task to the scheduler's priority queue."""
        try:
            if 'priority' not in task or not 1 <= task['priority'] <= 5:
                logger.error(f"Invalid priority for task {task.get('id')}")
                return False

            required_fields = ['id', 'task_type', 'model_size', 'expected_duration']
            if not all(field in task for field in required_fields):
                logger.error(f"Missing required fields for task {task.get('id')}")
                return False

            task_entry = {
                **task,
                'status': 'queued',
                'submit_time': time.time(),
                'start_time': None,
                'completion_time': None
            }

            with self.lock:
                self.priority_queues[task['priority']].append(task_entry)
                logger.info(f"Added task {task['id']} to priority queue {task['priority']}")

            return True

        except Exception as e:
            logger.error(f"Error adding task {task.get('id')}: {str(e)}")
            return False

    def get_task_status(self, task_id: int) -> dict:
        """Get the current status of a specific task."""
        try:
            if task_id in self.running_tasks:
                task = self.running_tasks[task_id]
                if task.get('start_time'):
                    elapsed = time.time() - task['start_time']
                    progress = min(100, (elapsed / task['expected_duration']) * 100)
                    return {
                        'status': 'running',
                        'progress': progress,
                        'elapsed_time': elapsed
                    }

            for priority, queue in self.priority_queues.items():
                for task in queue:
                    if task['id'] == task_id:
                        return {
                            'status': 'queued',
                            'priority': priority,
                            'queue_position': list(queue).index(task)
                        }

            return {'status': 'not_found'}

        except Exception as e:
            logger.error(f"Error getting task status: {str(e)}")
            return {'status': 'error', 'message': str(e)}

    def cancel_task(self, task_id: int) -> bool:
        """Cancel a running or queued task."""
        try:
            if task_id in self.running_tasks:
                task = self.running_tasks.pop(task_id)
                task['status'] = 'cancelled'
                logger.info(f"Cancelled running task {task_id}")
                return True

            for priority, queue in self.priority_queues.items():
                for task in queue:
                    if task['id'] == task_id:
                        queue.remove(task)
                        logger.info(f"Cancelled queued task {task_id}")
                        return True

            logger.warning(f"Task {task_id} not found for cancellation")
            return False

        except Exception as e:
            logger.error(f"Error cancelling task {task_id}: {str(e)}")
            return False

# Test implementation
def test_scheduler():
    """Test the GPU Task Scheduler with sample tasks"""
    print("Initializing GPU Task Scheduler...")
    scheduler = GPUTaskScheduler()

    # Start the monitoring thread
    print("Starting monitoring thread...")
    monitor_thread = threading.Thread(
        target=scheduler.monitor_and_optimize,
        args=(1.0,),
        daemon=True
    )
    monitor_thread.start()

    # Define some test tasks
    test_tasks = [
        {
            'id': 1,
            'task_type': 'training',
            'model_size': 'large',
            'priority': 5,
            'expected_duration': 20,  # Shorter duration for testing
            'batch_size': 32
        },
        {
            'id': 2,
            'task_type': 'inference',
            'model_size': 'small',
            'priority': 3,
            'expected_duration': 10,
            'batch_size': 16
        },
        {
            'id': 3,
            'task_type': 'training',
            'model_size': 'medium',
            'priority': 4,
            'expected_duration': 15,
            'batch_size': 24
        }
    ]

    # Add tasks with progress bar
    print("\nAdding test tasks...")
    for task in tqdm(test_tasks, desc="Adding tasks"):
        scheduler.add_task(task)
        time.sleep(1)  # Small delay between adding tasks

    # Monitor task status
    print("\nMonitoring task status for 30 seconds...")
    start_time = time.time()
    try:
        while time.time() - start_time < 30:
            for task in test_tasks:
                status = scheduler.get_task_status(task['id'])
                if status['status'] != 'not_found':
                    print(f"\nTask {task['id']} status: {status}")
            time.sleep(5)
    except KeyboardInterrupt:
        print("\nTest interrupted by user")

    # Test task cancellation
    print("\nTesting task cancellation...")
    if scheduler.cancel_task(1):
        print("Successfully cancelled task 1")
    else:
        print("Task 1 not found or already completed")

    print("\nTest completed. Scheduler is still running and can accept new tasks.")
    return scheduler

if __name__ == "__main__":
    scheduler = test_scheduler()

    print("\nTo add more tasks, use:")
    print("""
    new_task = {
        'id': 4,  # Use a new unique ID
        'task_type': 'training',
        'model_size': 'small',
        'priority': 4,
        'expected_duration': 60,
        'batch_size': 16
    }
    scheduler.add_task(new_task)
    """)

Initializing GPU Task Scheduler...
Tue Jan  7 14:42:26 2025       
+---------------------------------------------------------------------------------------+
| NVIDIA-SMI 535.104.05             Driver Version: 535.104.05   CUDA Version: 12.2     |
|-----------------------------------------+----------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |         Memory-Usage | GPU-Util  Compute M. |
|                                         |                      |               MIG M. |
|   0  Tesla T4                       Off | 00000000:00:04.0 Off |                    0 |
| N/A   39C    P8               9W /  70W |      0MiB / 15360MiB |      0%      Default |
|                                         |                      |                  N/A |
+-----------------------------------------+----------------------+----------------------+
                                 

Output()

Starting monitoring thread...

Adding test tasks...

Monitoring task status for 30 seconds...

Task 1 status: {'status': 'running', 'progress': 8.97320032119751, 'elapsed_time': 1.794640064239502}

Task 2 status: {'status': 'running', 'progress': 16.86445713043213, 'elapsed_time': 1.686445713043213}

Task 3 status: {'status': 'running', 'progress': 3.1191444396972656, 'elapsed_time': 0.46787166595458984}

Task 1 status: {'status': 'running', 'progress': 33.99939775466919, 'elapsed_time': 6.799879550933838}

Task 2 status: {'status': 'running', 'progress': 66.91790103912354, 'elapsed_time': 6.6917901039123535}

Task 3 status: {'status': 'running', 'progress': 36.49155139923096, 'elapsed_time': 5.4737327098846436}

Task 1 status: {'status': 'running', 'progress': 59.02835249900817, 'elapsed_time': 11.805670499801636}

Task 3 status: {'status': 'running', 'progress': 69.8595142364502, 'elapsed_time': 10.47892713546753}

Task 1 status: {'status': 'running', 'progress': 84.05430197715759, '




Testing task cancellation...
Task 1 not found or already completed

Test completed. Scheduler is still running and can accept new tasks.

To add more tasks, use:

    new_task = {
        'id': 4,  # Use a new unique ID
        'task_type': 'training',
        'model_size': 'small',
        'priority': 4,
        'expected_duration': 60,
        'batch_size': 16
    }
    scheduler.add_task(new_task)
    
