# 🛠️ Colab Utilities and Helper Functions

## AI Coding Assistant - Utility Functions

This notebook contains utility functions and helper tools for managing your AI coding assistant in Google Colab.

### 🎯 What's Included

- **🔧 Environment Management**: Setup and configuration utilities
- **📊 Model Management**: Load, save, and manage AI models
- **🔄 Data Sync**: Synchronize data between local and cloud
- **📱 Service Management**: Start, stop, and monitor services
- **🐛 Debugging Tools**: Logging and error handling utilities
- **📈 Performance Monitoring**: Track resource usage and performance

---


In [None]:
# 📦 Import Required Libraries

import os
import sys
import json
import time
import psutil
import logging
import subprocess
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Any, Optional

# Install additional utilities if needed
try:
    import psutil
except ImportError:
    !pip install -q psutil
    import psutil

try:
    import GPUtil
except ImportError:
    !pip install -q GPUtil
    import GPUtil

print('✅ Utilities imported successfully!')

## 🔧 Environment Management Utilities


In [None]:
# 🔧 Environment Management Class

class ColabEnvironmentManager:
    def __init__(self, project_root: str = '/content/ai_assistant'):
        self.project_root = Path(project_root)
        self.config_file = self.project_root / 'config' / 'colab_config.json'
        self.log_file = self.project_root / 'logs' / 'colab.log'
        self.setup_logging()
    
    def setup_logging(self):
        """Setup logging configuration"""
        os.makedirs(self.log_file.parent, exist_ok=True)
        
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler(self.log_file),
                logging.StreamHandler(sys.stdout)
            ]
        )
        self.logger = logging.getLogger('ColabEnvironment')
    
    def get_system_info(self) -> Dict[str, Any]:
        """Get comprehensive system information"""
        info = {
            'timestamp': datetime.now().isoformat(),
            'python_version': sys.version,
            'platform': sys.platform,
            'cpu_count': psutil.cpu_count(),
            'cpu_percent': psutil.cpu_percent(interval=1),
            'memory': {
                'total': psutil.virtual_memory().total,
                'available': psutil.virtual_memory().available,
                'percent': psutil.virtual_memory().percent
            },
            'disk': {
                'total': psutil.disk_usage('/').total,
                'free': psutil.disk_usage('/').free,
                'percent': psutil.disk_usage('/').percent
            }
        }
        
        # GPU information
        try:
            gpus = GPUtil.getGPUs()
            info['gpu'] = []
            for gpu in gpus:
                info['gpu'].append({
                    'name': gpu.name,
                    'memory_total': gpu.memoryTotal,
                    'memory_used': gpu.memoryUsed,
                    'memory_free': gpu.memoryFree,
                    'temperature': gpu.temperature,
                    'load': gpu.load
                })
        except Exception as e:
            info['gpu'] = f'GPU info unavailable: {str(e)}'
        
        return info
    
    def save_config(self, config: Dict[str, Any]):
        """Save configuration to file"""
        os.makedirs(self.config_file.parent, exist_ok=True)
        with open(self.config_file, 'w') as f:
            json.dump(config, f, indent=2)
        self.logger.info(f'Configuration saved to {self.config_file}')
    
    def load_config(self) -> Dict[str, Any]:
        """Load configuration from file"""
        if self.config_file.exists():
            with open(self.config_file, 'r') as f:
                return json.load(f)
        return {}
    
    def create_project_structure(self):
        """Create complete project directory structure"""
        directories = [
            'backend/app/api',
            'backend/app/core',
            'backend/app/models',
            'backend/app/services',
            'backend/app/utils',
            'frontend/src/components',
            'frontend/src/pages',
            'frontend/src/utils',
            'data/training',
            'data/models',
            'data/datasets',
            'logs',
            'config',
            'scripts',
            'tests'
        ]
        
        for directory in directories:
            dir_path = self.project_root / directory
            os.makedirs(dir_path, exist_ok=True)
            
            # Create __init__.py for Python packages
            if 'backend' in directory or 'frontend' in directory:
                init_file = dir_path / '__init__.py'
                if not init_file.exists():
                    init_file.touch()
        
        self.logger.info(f'Project structure created at {self.project_root}')
    
    def install_requirements(self, requirements: List[str]):
        """Install Python packages"""
        for package in requirements:
            try:
                subprocess.run([sys.executable, '-m', 'pip', 'install', '-q', package], 
                             check=True, capture_output=True, text=True)
                self.logger.info(f'✅ Installed {package}')
            except subprocess.CalledProcessError as e:
                self.logger.error(f'❌ Failed to install {package}: {e}')
    
    def check_gpu_availability(self) -> Dict[str, Any]:
        """Check GPU availability and status"""
        try:
            import torch
            gpu_info = {
                'torch_cuda_available': torch.cuda.is_available(),
                'torch_cuda_device_count': torch.cuda.device_count() if torch.cuda.is_available() else 0
            }
            
            if torch.cuda.is_available():
                gpu_info['current_device'] = torch.cuda.current_device()
                gpu_info['device_name'] = torch.cuda.get_device_name()
                gpu_info['memory_allocated'] = torch.cuda.memory_allocated()
                gpu_info['memory_reserved'] = torch.cuda.memory_reserved()
            
            return gpu_info
        except ImportError:
            return {'error': 'PyTorch not available'}

# Initialize environment manager
env_manager = ColabEnvironmentManager()
print('✅ Environment manager initialized!')

# Display system information
system_info = env_manager.get_system_info()
print('\n📊 System Information:')
print(f"CPU: {system_info['cpu_count']} cores, {system_info['cpu_percent']:.1f}% usage")
print(f"Memory: {system_info['memory']['percent']:.1f}% used")
print(f"Disk: {system_info['disk']['percent']:.1f}% used")

gpu_info = env_manager.check_gpu_availability()
if gpu_info.get('torch_cuda_available'):
    print(f"🚀 GPU Available: {gpu_info['device_name']}")
else:
    print('⚠️ No GPU available')

## 🤖 Model Management Utilities


In [None]:
# 🤖 Model Management Class

class ColabModelManager:
    def __init__(self, models_dir: str = '/content/ai_assistant/data/models'):
        self.models_dir = Path(models_dir)
        os.makedirs(self.models_dir, exist_ok=True)
        self.logger = logging.getLogger('ModelManager')
    
    def download_model(self, model_name: str, model_type: str = 'huggingface') -> bool:
        """Download a model from various sources"""
        try:
            if model_type == 'huggingface':
                from transformers import AutoTokenizer, AutoModelForCausalLM
                
                self.logger.info(f'Downloading {model_name} from Hugging Face...')
                
                # Download tokenizer
                tokenizer = AutoTokenizer.from_pretrained(model_name)
                tokenizer_path = self.models_dir / f'{model_name.replace("/", "_")}_tokenizer'
                tokenizer.save_pretrained(tokenizer_path)
                
                # Download model
                model = AutoModelForCausalLM.from_pretrained(model_name)
                model_path = self.models_dir / f'{model_name.replace("/", "_")}_model'
                model.save_pretrained(model_path)
                
                self.logger.info(f'✅ Model {model_name} downloaded successfully')
                return True
            
            else:
                self.logger.error(f'Unsupported model type: {model_type}')
                return False
                
        except Exception as e:
            self.logger.error(f'Failed to download {model_name}: {str(e)}')
            return False
    
    def list_local_models(self) -> List[Dict[str, Any]]:
        """List all locally stored models"""
        models = []
        
        for model_dir in self.models_dir.iterdir():
            if model_dir.is_dir():
                model_info = {
                    'name': model_dir.name,
                    'path': str(model_dir),
                    'size_mb': sum(f.stat().st_size for f in model_dir.rglob('*') if f.is_file()) / (1024 * 1024),
                    'created': datetime.fromtimestamp(model_dir.stat().st_ctime).isoformat()
                }
                models.append(model_info)
        
        return models
    
    def delete_model(self, model_name: str) -> bool:
        """Delete a local model"""
        try:
            model_path = self.models_dir / model_name
            if model_path.exists():
                import shutil
                shutil.rmtree(model_path)
                self.logger.info(f'✅ Model {model_name} deleted')
                return True
            else:
                self.logger.warning(f'Model {model_name} not found')
                return False
        except Exception as e:
            self.logger.error(f'Failed to delete {model_name}: {str(e)}')
            return False
    
    def get_model_info(self, model_name: str) -> Optional[Dict[str, Any]]:
        """Get detailed information about a model"""
        model_path = self.models_dir / model_name
        
        if not model_path.exists():
            return None
        
        try:
            # Try to load config if available
            config_file = model_path / 'config.json'
            config = {}
            if config_file.exists():
                with open(config_file, 'r') as f:
                    config = json.load(f)
            
            return {
                'name': model_name,
                'path': str(model_path),
                'config': config,
                'files': [f.name for f in model_path.iterdir()],
                'size_mb': sum(f.stat().st_size for f in model_path.rglob('*') if f.is_file()) / (1024 * 1024)
            }
        except Exception as e:
            self.logger.error(f'Error getting model info: {str(e)}')
            return None

# Initialize model manager
model_manager = ColabModelManager()
print('✅ Model manager initialized!')

# List available models
local_models = model_manager.list_local_models()
print(f'\n📦 Local Models: {len(local_models)} found')
for model in local_models:
    print(f"  - {model['name']} ({model['size_mb']:.1f} MB)")

## 🔄 Service Management Utilities


In [None]:
# 🔄 Service Management Class

import threading
import queue
from contextlib import contextmanager

class ColabServiceManager:
    def __init__(self):
        self.services = {}
        self.logger = logging.getLogger('ServiceManager')
    
    def start_backend(self, port: int = 8000, host: str = '0.0.0.0') -> bool:
        """Start the FastAPI backend service"""
        try:
            import uvicorn
            from backend.app.main import app
            
            # Start in a separate thread
            def run_server():
                uvicorn.run(app, host=host, port=port, log_level="info")
            
            backend_thread = threading.Thread(target=run_server, daemon=True)
            backend_thread.start()
            
            self.services['backend'] = {
                'thread': backend_thread,
                'port': port,
                'host': host,
                'status': 'running'
            }
            
            self.logger.info(f'✅ Backend started on {host}:{port}')
            return True
            
        except Exception as e:
            self.logger.error(f'Failed to start backend: {str(e)}')
            return False
    
    def start_frontend(self, port: int = 8501) -> bool:
        """Start the Streamlit frontend service"""
        try:
            # Create a simple Streamlit app
            frontend_code = '''
import streamlit as st
import requests
import json

st.set_page_config(page_title="AI Coding Assistant", page_icon="🤖")

st.title("🤖 AI Coding Assistant")
st.markdown("### Powered by Google Colab")

# API endpoint
API_BASE = "http://localhost:8000"

# Chat interface
if "messages" not in st.session_state:
    st.session_state.messages = []

for message in st.session_state.messages:
    with st.chat_message(message["role"]):
        st.markdown(message["content"])

if prompt := st.chat_input("Ask me to generate code..."):
    st.session_state.messages.append({"role": "user", "content": prompt})
    with st.chat_message("user"):
        st.markdown(prompt)
    
    with st.chat_message("assistant"):
        try:
            response = requests.post(
                f"{API_BASE}/api/code/generate",
                json={"prompt": prompt, "language": "python"}
            )
            if response.status_code == 200:
                result = response.json()
                code = result.get("code", "No code generated")
                st.code(code, language="python")
                st.session_state.messages.append({"role": "assistant", "content": code})
            else:
                st.error("Failed to generate code")
        except Exception as e:
            st.error(f"Error: {str(e)}")
            '''
            
            # Save frontend app
            frontend_file = '/content/ai_assistant/frontend/streamlit_app.py'
            os.makedirs(os.path.dirname(frontend_file), exist_ok=True)
            with open(frontend_file, 'w') as f:
                f.write(frontend_code.strip())
            
            # Start Streamlit
            def run_streamlit():
                os.system(f'streamlit run {frontend_file} --server.port {port} --server.headless true')
            
            frontend_thread = threading.Thread(target=run_streamlit, daemon=True)
            frontend_thread.start()
            
            self.services['frontend'] = {
                'thread': frontend_thread,
                'port': port,
                'status': 'running'
            }
            
            self.logger.info(f'✅ Frontend started on port {port}')
            return True
            
        except Exception as e:
            self.logger.error(f'Failed to start frontend: {str(e)}')
            return False
    
    def setup_ngrok(self, port: int, auth_token: str = None) -> Optional[str]:
        """Setup ngrok tunnel for external access"""
        try:
            from pyngrok import ngrok
            
            if auth_token:
                ngrok.set_auth_token(auth_token)
            
            # Create tunnel
            tunnel = ngrok.connect(port)
            public_url = tunnel.public_url
            
            self.logger.info(f'✅ Ngrok tunnel created: {public_url}')
            return public_url
            
        except Exception as e:
            self.logger.error(f'Failed to setup ngrok: {str(e)}')
            return None
    
    def get_service_status(self) -> Dict[str, Any]:
        """Get status of all services"""
        status = {}
        
        for service_name, service_info in self.services.items():
            status[service_name] = {
                'status': service_info['status'],
                'port': service_info.get('port'),
                'thread_alive': service_info['thread'].is_alive() if 'thread' in service_info else False
            }
        
        return status
    
    def stop_service(self, service_name: str) -> bool:
        """Stop a specific service"""
        if service_name in self.services:
            try:
                # Note: In Colab, we can't easily stop threads
                # This is a limitation of the environment
                self.services[service_name]['status'] = 'stopped'
                self.logger.info(f'Service {service_name} marked as stopped')
                return True
            except Exception as e:
                self.logger.error(f'Error stopping {service_name}: {str(e)}')
                return False
        else:
            self.logger.warning(f'Service {service_name} not found')
            return False

# Initialize service manager
service_manager = ColabServiceManager()
print('✅ Service manager initialized!')