In [1]:
# Basic data types and operations
import time
import platform
from typing import List, Dict, Optional, Union, Tuple
from pathlib import Path
from datetime import datetime, timedelta


# Variables and basic types
name: str = "Desktop Bot"
version: float = 1.0
is_active: bool =  True 

data: Optional[Dict] = None

# String operations (crucial for automation)
text = "Blog Post Title"
formatted_text = text.title()  # Capitalize each word
safe_filename = text.replace(" ", "_").lower()  # Create safe filename
print(f"Original: {text}")
print(f"Formatted: {formatted_text}")
print(f"Safe filename: {safe_filename}")

# F-strings and formatting (essential for dynamic content)
post_id = 1
user_id = 10
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

blog_content = f"""BLOG POST #{post_id}

TITLE: {formatted_text}

Content goes here...

---
Post ID: {post_id}
User ID: {user_id}
Generated: {timestamp}
"""

print(blog_content)


Original: Blog Post Title
Formatted: Blog Post Title
Safe filename: blog_post_title
BLOG POST #1

TITLE: Blog Post Title

Content goes here...

---
Post ID: 1
User ID: 10
Generated: 2025-07-20 20:57:59



In [2]:
# Lists and list comprehensions (common in automation)
posts = [
    {"id": 1, "title": "First Post", "userId": 1},
    {"id": 2, "title": "Second Post", "userId": 1},
    {"id": 3, "title": "Third Post", "userId": 2},
]

# List comprehensions for data processing
post_titles = [post["title"] for post in posts]
user_1_posts = [post for post in posts if post["userId"] == 1]
formatted_titles = [title.upper() for title in post_titles if len(title) > 5]

print("Post titles:", post_titles)
print("User 1 posts:", user_1_posts)
print("Formatted titles:", formatted_titles)

# Dictionary operations (API responses)
api_response = {
    "status": "success",
    "data": posts,
    "meta": {"total": 3, "page": 1}
}

# Dictionary comprehensions
post_lookup = {post["id"]: post["title"] for post in posts}
title_lengths = {title: len(title) for title in post_titles}

print("Post lookup:", post_lookup)
print("Title lengths:", title_lengths)

# Sets for unique operations
all_user_ids = {post["userId"] for post in posts}
unique_words = set(" ".join(post_titles).lower().split())

print("Unique user IDs:", all_user_ids)
print("Unique words:", unique_words)

# Advanced slicing and unpacking
first_post, *rest_posts, last_post = posts if len(posts) >= 2 else (posts[0], [], posts[0])
print(f"First: {first_post['title']}, Last: {last_post['title']}")

# Enumerate and zip (useful for automation loops)
for index, post in enumerate(posts, 1):
    print(f"Processing post {index}: {post['title']}")

filenames = ["post_1.txt", "post_2.txt", "post_3.txt"]
for post, filename in zip(posts, filenames):
    print(f"Saving {post['title']} to {filename}")


Post titles: ['First Post', 'Second Post', 'Third Post']
User 1 posts: [{'id': 1, 'title': 'First Post', 'userId': 1}, {'id': 2, 'title': 'Second Post', 'userId': 1}]
Formatted titles: ['FIRST POST', 'SECOND POST', 'THIRD POST']
Post lookup: {1: 'First Post', 2: 'Second Post', 3: 'Third Post'}
Title lengths: {'First Post': 10, 'Second Post': 11, 'Third Post': 10}
Unique user IDs: {1, 2}
Unique words: {'first', 'third', 'post', 'second'}
First: First Post, Last: Third Post
Processing post 1: First Post
Processing post 2: Second Post
Processing post 3: Third Post
Saving First Post to post_1.txt
Saving Second Post to post_2.txt
Saving Third Post to post_3.txt


In [None]:
# Base automation bot class
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum

class BotStatus(Enum):
    IDLE = "idle"
    RUNNING = "running"
    COMPLETED = "completed"
    ERROR = "error"

@dataclass
class PostData:
    """Data class for blog posts"""
    id: int
    title: str
    body: str
    user_id: int
    
    def to_filename(self) -> str:
        """Convert to safe filename"""
        return f"post_{self.id}.txt"
    
    def format_content(self) -> str:
        """Format as blog post"""
        return f"""BLOG POST #{self.id}

TITLE: {self.title.title()}

CONTENT:
{self.body}

---
User ID: {self.user_id}
"""

class BaseBot(ABC):
    """Abstract base class for automation bots"""
    
    def __init__(self, name: str):
        self.name = name
        self.status = BotStatus.IDLE
        self.processed_count = 0
        self.start_time = None
    
    @abstractmethod
    def process_item(self, item) -> bool:
        """Process a single item - must be implemented by subclasses"""
        pass
    
    def start(self):
        """Start the automation process"""
        self.status = BotStatus.RUNNING
        self.start_time = datetime.now()
        print(f"🤖 {self.name} started at {self.start_time}")
    
    def finish(self):
        """Finish the automation process"""
        self.status = BotStatus.COMPLETED
        duration = datetime.now() - self.start_time
        print(f"✅ {self.name} completed. Processed {self.processed_count} items in {duration}")

class DesktopBot(BaseBot):
    """Desktop automation bot implementation"""
    
    def __init__(self, name: str, output_dir: Path):
        super().__init__(name)
        self.output_dir = output_dir
        self.output_dir.mkdir(exist_ok=True)
    
    def process_item(self, post_data: PostData) -> bool:
        """Process a single post"""
        try:
            # Simulate processing time
            time.sleep(0.1)
            
            # Create file
            filename = self.output_dir / post_data.to_filename()
            content = post_data.format_content()
            
            with open(filename, 'w', encoding='utf-8') as f:
                f.write(content)
            
            self.processed_count += 1
            print(f"📝 Processed post {post_data.id}: {post_data.title[:30]}...")
            return True
            
        except Exception as e:
            print(f"❌ Error processing post {post_data.id}: {e}")
            return False

# Usage example
posts_data = [
    PostData(1, "First Post", "Content of first post", 1),
    PostData(2, "Second Post", "Content of second post", 1),
    PostData(3, "Third Post", "Content of third post", 2),
]

# Create and run bot
output_path = Path("test_output")
bot = DesktopBot("TestBot", output_path)
bot.start()

for post in posts_data:
    bot.process_item(post)

bot.finish()

# Demonstrate polymorphism and method overriding
class EnhancedBot(DesktopBot):
    """Enhanced bot with additional features"""
    
    def __init__(self, name: str, output_dir: Path):
        super().__init__(name, output_dir)
        self.failed_items = []
    
    def process_item(self, post_data: PostData) -> bool:
        """Enhanced processing with retry logic"""
        success = super().process_item(post_data)
        if not success:
            self.failed_items.append(post_data)
        return success
    
    def retry_failed(self):
        """Retry failed items"""
        if self.failed_items:
            print(f"🔄 Retrying {len(self.failed_items)} failed items...")
            for item in self.failed_items[:]:  # Copy list to avoid modification during iteration
                if self.process_item(item):
                    self.failed_items.remove(item)

# Property decorators and private methods
class ConfigurableBot:
    """Bot with configuration properties"""
    
    def __init__(self):
        self._delay = 0.1
        self._max_retries = 3
    
    @property
    def delay(self) -> float:
        """Get processing delay"""
        return self._delay
    
    @delay.setter
    def delay(self, value: float):
        """Set processing delay with validation"""
        if value < 0:
            raise ValueError("Delay cannot be negative")
        self._delay = value
    
    @property
    def max_retries(self) -> int:
        return self._max_retries
    
    def _validate_input(self, data) -> bool:
        """Private method for input validation"""
        return data is not None and hasattr(data, 'id')

# Context managers for resource management
class BotContext:
    """Context manager for bot operations"""
    
    def __init__(self, bot: BaseBot):
        self.bot = bot
    
    def __enter__(self):
        self.bot.start()
        return self.bot
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_type:
            print(f"❌ Error occurred: {exc_val}")
            self.bot.status = BotStatus.ERROR
        else:
            self.bot.finish()
        return False  # Don't suppress exceptions

# Usage with context manager
with BotContext(EnhancedBot("ContextBot", Path("context_output"))) as bot:
    for post in posts_data[:2]:  # Process first 2 posts
        bot.process_item(post)


In [None]:
# Custom exceptions for automation
class AutomationError(Exception):
    """Base exception for automation errors"""
    pass

class ApplicationNotFoundError(AutomationError):
    """Raised when desktop application cannot be found"""
    def __init__(self, app_name: str):
        self.app_name = app_name
        super().__init__(f"Application '{app_name}' not found or cannot be launched")

class SaveDialogError(AutomationError):
    """Raised when save dialog operations fail"""
    def __init__(self, operation: str, details: str = ""):
        self.operation = operation
        self.details = details
        super().__init__(f"Save dialog {operation} failed: {details}")

class APIError(AutomationError):
    """Raised when API operations fail"""
    def __init__(self, status_code: int, message: str):
        self.status_code = status_code
        self.message = message
        super().__init__(f"API Error {status_code}: {message}")

# Comprehensive error handling patterns
import requests
import json
from functools import wraps
import logging

# Setup logging for error tracking
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def retry_on_failure(max_retries: int = 3, delay: float = 1.0):
    """Decorator for retry logic"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            last_exception = None
            
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    last_exception = e
                    logger.warning(f"Attempt {attempt + 1} failed: {e}")
                    if attempt < max_retries - 1:
                        time.sleep(delay)
                    
            logger.error(f"All {max_retries} attempts failed")
            raise last_exception
        return wrapper
    return decorator

class RobustAutomationBot:
    """Bot with comprehensive error handling"""
    
    def __init__(self, name: str):
        self.name = name
        self.error_count = 0
        self.success_count = 0
    
    @retry_on_failure(max_retries=3, delay=0.5)
    def fetch_api_data(self, url: str) -> dict:
        """Fetch data from API with error handling"""
        try:
            response = requests.get(url, timeout=10)
            response.raise_for_status()  # Raises HTTPError for bad status codes
            
            data = response.json()
            logger.info(f"✅ Successfully fetched data from {url}")
            return data
            
        except requests.exceptions.Timeout:
            raise APIError(408, "Request timeout")
        except requests.exceptions.ConnectionError:
            raise APIError(0, "Connection error")
        except requests.exceptions.HTTPError as e:
            raise APIError(response.status_code, str(e))
        except json.JSONDecodeError:
            raise APIError(200, "Invalid JSON response")
    
    def safe_file_operation(self, filepath: Path, content: str) -> bool:
        """Safely write file with error handling"""
        try:
            # Ensure directory exists
            filepath.parent.mkdir(parents=True, exist_ok=True)
            
            # Write file with backup
            backup_path = filepath.with_suffix(filepath.suffix + '.bak')
            
            # Create backup if file exists
            if filepath.exists():
                filepath.rename(backup_path)
            
            with open(filepath, 'w', encoding='utf-8') as f:
                f.write(content)
            
            # Remove backup on success
            if backup_path.exists():
                backup_path.unlink()
            
            logger.info(f"✅ File saved successfully: {filepath}")
            self.success_count += 1
            return True
            
        except PermissionError:
            logger.error(f"❌ Permission denied: {filepath}")
            self.error_count += 1
            return False
        except OSError as e:
            logger.error(f"❌ OS error: {e}")
            self.error_count += 1
            return False
        except Exception as e:
            logger.error(f"❌ Unexpected error: {e}")
            self.error_count += 1
            return False
    
    def process_with_validation(self, data: dict) -> bool:
        """Process data with comprehensive validation"""
        try:
            # Input validation
            required_fields = ['id', 'title', 'body', 'userId']
            missing_fields = [field for field in required_fields if field not in data]
            
            if missing_fields:
                raise ValueError(f"Missing required fields: {missing_fields}")
            
            # Type validation
            if not isinstance(data['id'], int) or data['id'] <= 0:
                raise ValueError(f"Invalid ID: {data['id']}")
            
            if not isinstance(data['title'], str) or not data['title'].strip():
                raise ValueError(f"Invalid title: {data['title']}")
            
            # Process the data
            filename = f"post_{data['id']}.txt"
            content = f"""BLOG POST #{data['id']}

TITLE: {data['title']}

CONTENT:
{data['body']}

---
User ID: {data['userId']}
"""
            
            filepath = Path("validated_output") / filename
            return self.safe_file_operation(filepath, content)
            
        except ValueError as e:
            logger.error(f"❌ Validation error: {e}")
            self.error_count += 1
            return False
        except Exception as e:
            logger.error(f"❌ Processing error: {e}")
            self.error_count += 1
            return False
    
    def get_status_report(self) -> dict:
        """Get detailed status report"""
        total = self.success_count + self.error_count
        success_rate = (self.success_count / total * 100) if total > 0 else 0
        
        return {
            "total_processed": total,
            "successful": self.success_count,
            "failed": self.error_count,
            "success_rate": f"{success_rate:.1f}%"
        }

# Test error handling
bot = RobustAutomationBot("ErrorBot")

# Test with valid data
valid_data = {"id": 1, "title": "Test Post", "body": "Content", "userId": 1}
bot.process_with_validation(valid_data)

# Test with invalid data
invalid_data = {"id": -1, "title": "", "body": "Content"}  # Missing userId, invalid id and title
bot.process_with_validation(invalid_data)

# Test API with invalid URL
try:
    bot.fetch_api_data("https://invalid-url-that-does-not-exist.com")
except APIError as e:
    print(f"Caught API error: {e}")

print("Status report:", bot.get_status_report())

# Context manager for error handling
class ErrorHandlingContext:
    """Context manager that handles and logs errors"""
    
    def __init__(self, operation_name: str):
        self.operation_name = operation_name
        self.start_time = None
    
    def __enter__(self):
        self.start_time = datetime.now()
        logger.info(f"🚀 Starting {self.operation_name}")
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        duration = datetime.now() - self.start_time
        
        if exc_type is None:
            logger.info(f"✅ {self.operation_name} completed successfully in {duration}")
        else:
            logger.error(f"❌ {self.operation_name} failed after {duration}: {exc_val}")
            
            # Log different exception types differently
            if isinstance(exc_val, AutomationError):
                logger.error(f"Automation error details: {exc_val}")
            elif isinstance(exc_val, (requests.RequestException, APIError)):
                logger.error(f"Network/API error: {exc_val}")
            else:
                logger.error(f"Unexpected error: {exc_val}")
        
        return False  # Don't suppress exceptions

# Usage example
with ErrorHandlingContext("File Processing"):
    # This will succeed
    bot.safe_file_operation(Path("test_file.txt"), "Test content")

try:
    with ErrorHandlingContext("Risky Operation"):
        raise ValueError("Something went wrong!")
except ValueError:
    print("Exception was properly logged and re-raised")


In [None]:
# Modern path handling with pathlib
import shutil
import tempfile
import json
import csv
from pathlib import Path
from typing import Generator, Iterator
import os

# Path operations (essential for desktop automation)
def demo_path_operations():
    """Demonstrate modern path handling"""
    
    # Create paths cross-platform
    home = Path.home()
    desktop = home / "Desktop"
    project_dir = desktop / "tjm-project"
    
    print(f"Home: {home}")
    print(f"Desktop: {desktop}")
    print(f"Project dir: {project_dir}")
    
    # Path properties and methods
    sample_file = project_dir / "post_1.txt"
    print(f"Filename: {sample_file.name}")
    print(f"Stem: {sample_file.stem}")
    print(f"Suffix: {sample_file.suffix}")
    print(f"Parent: {sample_file.parent}")
    print(f"Parts: {sample_file.parts}")
    print(f"Is absolute: {sample_file.is_absolute()}")
    
    # Path manipulation
    backup_file = sample_file.with_suffix('.bak')
    numbered_file = sample_file.with_stem(f"{sample_file.stem}_v2")
    
    print(f"Backup file: {backup_file}")
    print(f"Numbered file: {numbered_file}")
    
    return project_dir

# File operations with context managers
class FileManager:
    """Comprehensive file management for automation"""
    
    def __init__(self, base_dir: Path):
        self.base_dir = Path(base_dir)
        self.ensure_directory()
    
    def ensure_directory(self):
        """Create directory if it doesn't exist"""
        self.base_dir.mkdir(parents=True, exist_ok=True)
        print(f"📁 Directory ensured: {self.base_dir}")
    
    def write_text_file(self, filename: str, content: str, encoding: str = 'utf-8') -> bool:
        """Write text file with error handling"""
        try:
            filepath = self.base_dir / filename
            with open(filepath, 'w', encoding=encoding) as f:
                f.write(content)
            print(f"✅ File written: {filepath}")
            return True
        except Exception as e:
            print(f"❌ Error writing file: {e}")
            return False
    
    def read_text_file(self, filename: str, encoding: str = 'utf-8') -> Optional[str]:
        """Read text file with error handling"""
        try:
            filepath = self.base_dir / filename
            if not filepath.exists():
                print(f"⚠️ File not found: {filepath}")
                return None
                
            with open(filepath, 'r', encoding=encoding) as f:
                content = f.read()
            print(f"✅ File read: {filepath}")
            return content
        except Exception as e:
            print(f"❌ Error reading file: {e}")
            return None
    
    def write_json_file(self, filename: str, data: dict) -> bool:
        """Write JSON file"""
        try:
            filepath = self.base_dir / filename
            with open(filepath, 'w', encoding='utf-8') as f:
                json.dump(data, f, indent=2, ensure_ascii=False)
            print(f"✅ JSON file written: {filepath}")
            return True
        except Exception as e:
            print(f"❌ Error writing JSON: {e}")
            return False
    
    def read_json_file(self, filename: str) -> Optional[dict]:
        """Read JSON file"""
        try:
            filepath = self.base_dir / filename
            if not filepath.exists():
                return None
                
            with open(filepath, 'r', encoding='utf-8') as f:
                data = json.load(f)
            print(f"✅ JSON file read: {filepath}")
            return data
        except Exception as e:
            print(f"❌ Error reading JSON: {e}")
            return None
    
    def write_csv_file(self, filename: str, data: List[dict], fieldnames: List[str]) -> bool:
        """Write CSV file"""
        try:
            filepath = self.base_dir / filename
            with open(filepath, 'w', newline='', encoding='utf-8') as f:
                writer = csv.DictWriter(f, fieldnames=fieldnames)
                writer.writeheader()
                writer.writerows(data)
            print(f"✅ CSV file written: {filepath}")
            return True
        except Exception as e:
            print(f"❌ Error writing CSV: {e}")
            return False
    
    def backup_file(self, filename: str) -> bool:
        """Create backup of file"""
        try:
            original = self.base_dir / filename
            if not original.exists():
                print(f"⚠️ Original file not found: {original}")
                return False
            
            backup = original.with_suffix(original.suffix + '.bak')
            shutil.copy2(original, backup)
            print(f"✅ Backup created: {backup}")
            return True
        except Exception as e:
            print(f"❌ Error creating backup: {e}")
            return False
    
    def list_files(self, pattern: str = "*") -> List[Path]:
        """List files matching pattern"""
        files = list(self.base_dir.glob(pattern))
        print(f"📄 Found {len(files)} files matching '{pattern}'")
        return files
    
    def cleanup_old_files(self, days: int = 7) -> int:
        """Remove files older than specified days"""
        cutoff_time = datetime.now() - timedelta(days=days)
        removed_count = 0
        
        for file in self.base_dir.iterdir():
            if file.is_file():
                file_time = datetime.fromtimestamp(file.stat().st_mtime)
                if file_time < cutoff_time:
                    try:
                        file.unlink()
                        print(f"🗑️ Removed old file: {file}")
                        removed_count += 1
                    except Exception as e:
                        print(f"❌ Error removing file {file}: {e}")
        
        return removed_count

# File iteration and processing
def process_files_generator(directory: Path, pattern: str = "*.txt") -> Generator[Tuple[Path, str], None, None]:
    """Generator that yields file paths and content"""
    for filepath in directory.glob(pattern):
        try:
            with open(filepath, 'r', encoding='utf-8') as f:
                content = f.read()
            yield filepath, content
        except Exception as e:
            print(f"❌ Error reading {filepath}: {e}")

def batch_process_files(directory: Path, processor_func, batch_size: int = 5):
    """Process files in batches"""
    files = list(directory.glob("*.txt"))
    
    for i in range(0, len(files), batch_size):
        batch = files[i:i + batch_size]
        print(f"Processing batch {i//batch_size + 1}: {len(batch)} files")
        
        for file in batch:
            try:
                processor_func(file)
            except Exception as e:
                print(f"❌ Error processing {file}: {e}")

# Temporary files for testing
class TempFileManager:
    """Manage temporary files for testing automation"""
    
    def __init__(self):
        self.temp_dir = Path(tempfile.mkdtemp(prefix="automation_test_"))
        print(f"📁 Created temp directory: {self.temp_dir}")
    
    def create_test_files(self, count: int = 5) -> List[Path]:
        """Create test files for automation testing"""
        files = []
        for i in range(1, count + 1):
            content = f"""BLOG POST #{i}

TITLE: Test Post {i}

CONTENT:
This is the content of test post number {i}.
It contains multiple paragraphs.

This is the second paragraph with some sample text.

---
Post ID: {i}
User ID: {i % 3 + 1}
Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
"""
            filepath = self.temp_dir / f"post_{i}.txt"
            with open(filepath, 'w', encoding='utf-8') as f:
                f.write(content)
            files.append(filepath)
        
        print(f"✅ Created {len(files)} test files")
        return files
    
    def cleanup(self):
        """Remove temporary directory"""
        try:
            shutil.rmtree(self.temp_dir)
            print(f"🗑️ Cleaned up temp directory: {self.temp_dir}")
        except Exception as e:
            print(f"❌ Error cleaning up: {e}")

# Demo usage
print("=== Path Operations Demo ===")
project_dir = demo_path_operations()

print("\n=== File Manager Demo ===")
file_manager = FileManager("demo_files")

# Write different file types
sample_data = {"posts": [{"id": 1, "title": "Sample", "content": "Test"}]}
file_manager.write_json_file("config.json", sample_data)

blog_content = """BLOG POST #1

TITLE: Sample Blog Post

CONTENT:
This is a sample blog post content.
"""
file_manager.write_text_file("sample_post.txt", blog_content)

# CSV example
csv_data = [
    {"post_id": 1, "title": "First Post", "status": "published"},
    {"post_id": 2, "title": "Second Post", "status": "draft"}
]
file_manager.write_csv_file("posts.csv", csv_data, ["post_id", "title", "status"])

# List and backup files
files = file_manager.list_files("*.txt")
if files:
    file_manager.backup_file("sample_post.txt")

print("\n=== Temporary Files Demo ===")
temp_manager = TempFileManager()
test_files = temp_manager.create_test_files(3)

# Process files with generator
print("\n=== File Processing Demo ===")
for filepath, content in process_files_generator(temp_manager.temp_dir):
    print(f"📄 {filepath.name}: {len(content)} characters")

# Clean up
temp_manager.cleanup()

# Advanced file operations
print("\n=== Advanced File Operations ===")

def safe_atomic_write(filepath: Path, content: str) -> bool:
    """Atomic write operation (write to temp, then rename)"""
    temp_path = filepath.with_suffix(filepath.suffix + '.tmp')
    try:
        with open(temp_path, 'w', encoding='utf-8') as f:
            f.write(content)
        temp_path.replace(filepath)  # Atomic rename
        print(f"✅ Atomic write successful: {filepath}")
        return True
    except Exception as e:
        if temp_path.exists():
            temp_path.unlink()
        print(f"❌ Atomic write failed: {e}")
        return False

# File watching simulation
def monitor_directory_changes(directory: Path, duration: int = 5):
    """Monitor directory for changes (simplified)"""
    initial_files = set(directory.glob("*"))
    print(f"👀 Monitoring {directory} for {duration} seconds...")
    
    start_time = time.time()
    while time.time() - start_time < duration:
        current_files = set(directory.glob("*"))
        
        new_files = current_files - initial_files
        deleted_files = initial_files - current_files
        
        if new_files:
            print(f"📄 New files: {[f.name for f in new_files]}")
        if deleted_files:
            print(f"🗑️ Deleted files: {[f.name for f in deleted_files]}")
        
        initial_files = current_files
        time.sleep(1)
    
    print("👀 Monitoring stopped")

# Test atomic write
test_dir = Path("atomic_test")
test_dir.mkdir(exist_ok=True)
safe_atomic_write(test_dir / "atomic_file.txt", "This was written atomically!")

# Cleanup test directory
try:
    shutil.rmtree(test_dir)
    shutil.rmtree("demo_files")
except:
    pass


In [None]:
# Comprehensive API handling for automation
import requests
import json
from urllib.parse import urljoin, urlparse
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry

class APIClient:
    """Robust API client for automation tasks"""
    
    def __init__(self, base_url: str, timeout: int = 10):
        self.base_url = base_url.rstrip('/')
        self.timeout = timeout
        self.session = self._create_session()
    
    def _create_session(self) -> requests.Session:
        """Create session with retry strategy"""
        session = requests.Session()
        
        # Retry strategy
        retry_strategy = Retry(
            total=3,
            backoff_factor=1,
            status_forcelist=[429, 500, 502, 503, 504],
            method_whitelist=["HEAD", "GET", "OPTIONS"]
        )
        
        adapter = HTTPAdapter(max_retries=retry_strategy)
        session.mount("http://", adapter)
        session.mount("https://", adapter)
        
        # Default headers
        session.headers.update({
            'User-Agent': 'Python-Automation-Bot/1.0',
            'Accept': 'application/json',
            'Content-Type': 'application/json'
        })
        
        return session
    
    def get(self, endpoint: str, params: Optional[dict] = None) -> dict:
        """GET request with error handling"""
        url = urljoin(self.base_url + '/', endpoint.lstrip('/'))
        
        try:
            response = self.session.get(url, params=params, timeout=self.timeout)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.Timeout:
            raise APIError(408, f"Request timeout for {url}")
        except requests.exceptions.ConnectionError:
            raise APIError(0, f"Connection error for {url}")
        except requests.exceptions.HTTPError as e:
            raise APIError(response.status_code, f"HTTP error: {e}")
        except json.JSONDecodeError:
            raise APIError(200, f"Invalid JSON response from {url}")
    
    def post(self, endpoint: str, data: dict) -> dict:
        """POST request with error handling"""
        url = urljoin(self.base_url + '/', endpoint.lstrip('/'))
        
        try:
            response = self.session.post(url, json=data, timeout=self.timeout)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            raise APIError(getattr(e.response, 'status_code', 0), str(e))

class JSONPlaceholderClient(APIClient):
    """Specific client for JSONPlaceholder API"""
    
    def __init__(self):
        super().__init__("https://jsonplaceholder.typicode.com")
    
    def get_posts(self, limit: Optional[int] = None, user_id: Optional[int] = None) -> List[dict]:
        """Get posts with optional filtering"""
        params = {}
        if limit:
            params['_limit'] = limit
        if user_id:
            params['userId'] = user_id
        
        return self.get('posts', params)
    
    def get_post(self, post_id: int) -> dict:
        """Get single post"""
        return self.get(f'posts/{post_id}')
    
    def get_users(self) -> List[dict]:
        """Get all users"""
        return self.get('users')
    
    def get_comments(self, post_id: Optional[int] = None) -> List[dict]:
        """Get comments, optionally for specific post"""
        params = {'postId': post_id} if post_id else {}
        return self.get('comments', params)

# Advanced request patterns
class BatchAPIClient:
    """Handle batch API requests efficiently"""
    
    def __init__(self, client: APIClient):
        self.client = client
    
    def fetch_posts_batch(self, post_ids: List[int]) -> Dict[int, dict]:
        """Fetch multiple posts, handling failures gracefully"""
        results = {}
        failed_ids = []
        
        for post_id in post_ids:
            try:
                post = self.client.get(f'posts/{post_id}')
                results[post_id] = post
                print(f"✅ Fetched post {post_id}")
            except APIError as e:
                failed_ids.append(post_id)
                print(f"❌ Failed to fetch post {post_id}: {e}")
            except Exception as e:
                failed_ids.append(post_id)
                print(f"❌ Unexpected error for post {post_id}: {e}")
        
        return results, failed_ids
    
    def fetch_with_dependencies(self, post_id: int) -> dict:
        """Fetch post with related data (user, comments)"""
        try:
            # Fetch post
            post = self.client.get(f'posts/{post_id}')
            
            # Fetch user info
            user = self.client.get(f'users/{post["userId"]}')
            
            # Fetch comments
            comments = self.client.get('comments', {'postId': post_id})
            
            return {
                'post': post,
                'author': user,
                'comments': comments,
                'meta': {
                    'fetched_at': datetime.now().isoformat(),
                    'comment_count': len(comments)
                }
            }
        except APIError as e:
            print(f"❌ Error fetching dependencies for post {post_id}: {e}")
            return {}

# Caching and rate limiting
from functools import lru_cache
import hashlib

class CachedAPIClient:
    """API client with caching capabilities"""
    
    def __init__(self, client: APIClient, cache_duration: int = 300):
        self.client = client
        self.cache_duration = cache_duration
        self._cache = {}
    
    def _cache_key(self, method: str, endpoint: str, params: dict = None) -> str:
        """Generate cache key"""
        cache_data = f"{method}:{endpoint}:{params or {}}"
        return hashlib.md5(cache_data.encode()).hexdigest()
    
    def _is_cache_valid(self, timestamp: datetime) -> bool:
        """Check if cache entry is still valid"""
        return (datetime.now() - timestamp).seconds < self.cache_duration
    
    def get_cached(self, endpoint: str, params: dict = None) -> dict:
        """Get with caching"""
        cache_key = self._cache_key('GET', endpoint, params)
        
        # Check cache
        if cache_key in self._cache:
            data, timestamp = self._cache[cache_key]
            if self._is_cache_valid(timestamp):
                print(f"🎯 Cache hit for {endpoint}")
                return data
        
        # Fetch from API
        print(f"🌐 Fetching from API: {endpoint}")
        data = self.client.get(endpoint, params)
        
        # Cache result
        self._cache[cache_key] = (data, datetime.now())
        return data

# Demo usage
print("=== Basic API Client Demo ===")
client = JSONPlaceholderClient()

try:
    # Fetch posts
    posts = client.get_posts(limit=3)
    print(f"📋 Fetched {len(posts)} posts")
    
    for post in posts:
        print(f"  - Post {post['id']}: {post['title'][:50]}...")
    
    # Fetch specific post
    single_post = client.get_post(1)
    print(f"📄 Single post: {single_post['title']}")
    
    # Fetch users
    users = client.get_users()
    print(f"👥 Found {len(users)} users")
    
except APIError as e:
    print(f"❌ API Error: {e}")

print("\n=== Batch Processing Demo ===")
batch_client = BatchAPIClient(client)
post_ids = [1, 2, 3, 999]  # 999 doesn't exist
results, failed = batch_client.fetch_posts_batch(post_ids)
print(f"✅ Successfully fetched: {list(results.keys())}")
print(f"❌ Failed: {failed}")

print("\n=== Dependencies Demo ===")
rich_post = batch_client.fetch_with_dependencies(1)
if rich_post:
    print(f"📄 Post: {rich_post['post']['title']}")
    print(f"👤 Author: {rich_post['author']['name']}")
    print(f"💬 Comments: {rich_post['meta']['comment_count']}")

print("\n=== Caching Demo ===")
cached_client = CachedAPIClient(client, cache_duration=60)

# First call - will fetch from API
posts1 = cached_client.get_cached('posts', {'_limit': 2})
print(f"First call: {len(posts1)} posts")

# Second call - will use cache
posts2 = cached_client.get_cached('posts', {'_limit': 2})
print(f"Second call: {len(posts2)} posts")

# API response validation
def validate_post_data(post: dict) -> bool:
    """Validate post data structure"""
    required_fields = ['id', 'title', 'body', 'userId']
    
    # Check required fields
    if not all(field in post for field in required_fields):
        return False
    
    # Type validation
    if not isinstance(post['id'], int) or post['id'] <= 0:
        return False
    
    if not isinstance(post['title'], str) or not post['title'].strip():
        return False
    
    if not isinstance(post['body'], str):
        return False
    
    if not isinstance(post['userId'], int) or post['userId'] <= 0:
        return False
    
    return True

# Test validation
for post in posts[:2]:
    is_valid = validate_post_data(post)
    print(f"Post {post['id']} validation: {'✅ Valid' if is_valid else '❌ Invalid'}")

# Rate limiting simulation
import time
from collections import deque

class RateLimitedClient:
    """Client with rate limiting"""
    
    def __init__(self, client: APIClient, requests_per_minute: int = 60):
        self.client = client
        self.requests_per_minute = requests_per_minute
        self.request_times = deque()
    
    def _can_make_request(self) -> bool:
        """Check if we can make a request without exceeding rate limit"""
        now = time.time()
        # Remove requests older than 1 minute
        while self.request_times and now - self.request_times[0] > 60:
            self.request_times.popleft()
        
        return len(self.request_times) < self.requests_per_minute
    
    def _wait_for_rate_limit(self):
        """Wait until we can make another request"""
        while not self._can_make_request():
            print("⏳ Rate limit reached, waiting...")
            time.sleep(1)
    
    def get(self, endpoint: str, params: dict = None) -> dict:
        """GET with rate limiting"""
        self._wait_for_rate_limit()
        self.request_times.append(time.time())
        return self.client.get(endpoint, params)

print("\n=== Rate Limiting Demo ===")
rate_limited = RateLimitedClient(client, requests_per_minute=5)  # Very low limit for demo

# This would normally trigger rate limiting with many requests
try:
    for i in range(3):
        post = rate_limited.get(f'posts/{i+1}')
        print(f"📄 Rate-limited fetch {i+1}: {post['title'][:30]}...")
except Exception as e:
    print(f"❌ Rate limiting error: {e}")


In [None]:
# Desktop automation patterns and best practices
import pyautogui
import subprocess
import platform
from typing import Callable, Any
import functools

# Configure PyAutoGUI for safety and reliability
pyautogui.PAUSE = 0.1  # Small pause between actions
pyautogui.FAILSAFE = True  # Move mouse to top-left to stop

class AutomationSafety:
    """Safety wrapper for automation operations"""
    
    def __init__(self, pause_time: float = 0.1):
        self.original_pause = pyautogui.PAUSE
        pyautogui.PAUSE = pause_time
        
    def __enter__(self):
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        pyautogui.PAUSE = self.original_pause

def safe_automation(func: Callable) -> Callable:
    """Decorator for safe automation operations"""
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        try:
            with AutomationSafety():
                return func(*args, **kwargs)
        except pyautogui.FailSafeException:
            print("🛑 Automation stopped by failsafe (mouse moved to corner)")
            return False
        except Exception as e:
            print(f"❌ Automation error: {e}")
            return False
    return wrapper

class CrossPlatformAutomation:
    """Cross-platform desktop automation handler"""
    
    def __init__(self):
        self.os_type = platform.system()
        self.shortcuts = self._get_platform_shortcuts()
    
    def _get_platform_shortcuts(self) -> dict:
        """Get platform-specific keyboard shortcuts"""
        if self.os_type == "Darwin":  # macOS
            return {
                'new': ['cmd', 'n'],
                'save': ['cmd', 's'],
                'save_as': ['cmd', 'shift', 's'],
                'select_all': ['cmd', 'a'],
                'copy': ['cmd', 'c'],
                'paste': ['cmd', 'v'],
                'close': ['cmd', 'w'],
                'quit': ['cmd', 'q'],
                'plain_text': ['cmd', 'shift', 't']
            }
        elif self.os_type == "Windows":
            return {
                'new': ['ctrl', 'n'],
                'save': ['ctrl', 's'],
                'save_as': ['ctrl', 'shift', 's'],
                'select_all': ['ctrl', 'a'],
                'copy': ['ctrl', 'c'],
                'paste': ['ctrl', 'v'],
                'close': ['alt', 'f4'],
                'quit': ['alt', 'f4'],
                'plain_text': []  # Not applicable for Notepad
            }
        else:  # Linux
            return {
                'new': ['ctrl', 'n'],
                'save': ['ctrl', 's'],
                'save_as': ['ctrl', 'shift', 's'],
                'select_all': ['ctrl', 'a'],
                'copy': ['ctrl', 'c'],
                'paste': ['ctrl', 'v'],
                'close': ['alt', 'f4'],
                'quit': ['ctrl', 'q'],
                'plain_text': []
            }
    
    @safe_automation
    def execute_shortcut(self, action: str) -> bool:
        """Execute platform-specific keyboard shortcut"""
        if action not in self.shortcuts:
            print(f"❌ Unknown action: {action}")
            return False
        
        keys = self.shortcuts[action]
        if not keys:
            print(f"⚠️ Action '{action}' not supported on {self.os_type}")
            return True  # Not an error, just not applicable
        
        try:
            if self.os_type == "Darwin" and 'cmd' in keys:
                # Use .hold() method for macOS command key
                with pyautogui.hold(['command']):
                    time.sleep(0.1)
                    for key in keys[1:]:  # Skip 'cmd'
                        pyautogui.press(key)
            else:
                pyautogui.hotkey(*keys)
            
            print(f"✅ Executed {action}: {'+'.join(keys)}")
            return True
        except Exception as e:
            print(f"❌ Failed to execute {action}: {e}")
            return False
    
    @safe_automation
    def launch_text_editor(self) -> bool:
        """Launch appropriate text editor for platform"""
        try:
            if self.os_type == "Darwin":
                subprocess.Popen(['open', '-a', 'TextEdit'])
                time.sleep(3)
                # Set to plain text mode
                self.execute_shortcut('plain_text')
            elif self.os_type == "Windows":
                subprocess.Popen(['notepad.exe'])
                time.sleep(2)
            else:  # Linux
                editors = ['gedit', 'kate', 'mousepad', 'leafpad']
                for editor in editors:
                    try:
                        subprocess.Popen([editor])
                        time.sleep(2)
                        return True
                    except FileNotFoundError:
                        continue
                return False
            
            return True
        except Exception as e:
            print(f"❌ Failed to launch text editor: {e}")
            return False

class TypewriterEffect:
    """Advanced text typing with various effects"""
    
    def __init__(self, base_delay: float = 0.01):
        self.base_delay = base_delay
    
    @safe_automation
    def type_naturally(self, text: str, variation: float = 0.5) -> bool:
        """Type text with natural human-like variations"""
        import random
        
        for char in text:
            # Add slight random variation to timing
            delay = self.base_delay * random.uniform(1 - variation, 1 + variation)
            
            # Longer pauses for punctuation
            if char in '.,!?;:':
                delay *= 2
            
            # Very short pause for spaces
            elif char == ' ':
                delay *= 0.5
            
            pyautogui.write(char)
            time.sleep(delay)
        
        return True
    
    @safe_automation
    def type_with_corrections(self, text: str, error_rate: float = 0.02) -> bool:
        """Type text with occasional 'mistakes' and corrections"""
        import random
        import string
        
        for i, char in enumerate(text):
            # Occasionally make a 'mistake'
            if random.random() < error_rate:
                # Type wrong character
                wrong_char = random.choice(string.ascii_lowercase)
                pyautogui.write(wrong_char)
                time.sleep(self.base_delay * 2)
                
                # Backspace and correct
                pyautogui.press('backspace')
                time.sleep(self.base_delay)
            
            # Type correct character
            pyautogui.write(char)
            time.sleep(self.base_delay)
        
        return True
    
    @safe_automation
    def type_progressively_faster(self, text: str, speed_increase: float = 0.9) -> bool:
        """Type text starting slow and getting faster"""
        words = text.split()
        current_delay = self.base_delay * 3  # Start slow
        
        for word_idx, word in enumerate(words):
            for char in word:
                pyautogui.write(char)
                time.sleep(current_delay)
            
            # Add space after word (except last word)
            if word_idx < len(words) - 1:
                pyautogui.write(' ')
                time.sleep(current_delay)
            
            # Increase speed (decrease delay)
            current_delay *= speed_increase
            current_delay = max(current_delay, self.base_delay * 0.1)  # Minimum delay
        
        return True

class AdvancedFileOperations:
    """Advanced file dialog and save operations"""
    
    def __init__(self, automation: CrossPlatformAutomation):
        self.automation = automation
    
    @safe_automation
    def save_with_retry(self, filepath: str, max_retries: int = 3) -> bool:
        """Save file with multiple retry strategies"""
        for attempt in range(max_retries):
            print(f"💾 Save attempt {attempt + 1}")
            
            # Try primary save method
            if self._try_save_method(filepath, 'save'):
                return True
            
            time.sleep(1)
            
            # Try save-as method
            if self._try_save_method(filepath, 'save_as'):
                return True
            
            time.sleep(1)
        
        print(f"❌ All save attempts failed for {filepath}")
        return False
    
    def _try_save_method(self, filepath: str, method: str) -> bool:
        """Try a specific save method"""
        try:
            # Open save dialog
            if not self.automation.execute_shortcut(method):
                return False
            
            time.sleep(2)  # Wait for dialog
            
            # Clear existing path
            self.automation.execute_shortcut('select_all')
            time.sleep(0.5)
            
            # Type new path character by character for reliability
            for char in filepath:
                pyautogui.write(char)
                time.sleep(0.001)
            
            time.sleep(1)
            
            # Confirm save
            pyautogui.press('enter')
            time.sleep(2)
            
            # Handle potential overwrite dialog
            pyautogui.press('enter')
            time.sleep(0.5)
            
            print(f"✅ Save successful using {method}")
            return True
            
        except Exception as e:
            print(f"❌ Save method {method} failed: {e}")
            return False

# Screen interaction and verification
class ScreenVerification:
    """Verify screen state during automation"""
    
    @staticmethod
    def wait_for_window(title_contains: str, timeout: int = 10) -> bool:
        """Wait for window with specific title to appear"""
        start_time = time.time()
        
        while time.time() - start_time < timeout:
            try:
                # This is a simplified version - in real implementation,
                # you'd use platform-specific window detection
                print(f"🔍 Looking for window containing '{title_contains}'...")
                time.sleep(1)
                
                # Simulate window found after a few seconds
                if time.time() - start_time > 3:
                    print(f"✅ Window found: {title_contains}")
                    return True
                    
            except Exception as e:
                print(f"❌ Error checking for window: {e}")
                
        print(f"⏰ Timeout waiting for window: {title_contains}")
        return False
    
    @staticmethod
    def capture_screenshot(filename: str = None) -> str:
        """Capture screenshot for debugging"""
        if filename is None:
            filename = f"automation_screenshot_{int(time.time())}.png"
        
        try:
            screenshot = pyautogui.screenshot()
            screenshot.save(filename)
            print(f"📸 Screenshot saved: {filename}")
            return filename
        except Exception as e:
            print(f"❌ Failed to capture screenshot: {e}")
            return ""

# Demo usage
print("=== Cross-Platform Automation Demo ===")
automation = CrossPlatformAutomation()
print(f"Platform: {automation.os_type}")
print(f"Available shortcuts: {list(automation.shortcuts.keys())}")

# Test shortcuts (without actually executing them)
print("\n=== Shortcut Testing ===")
test_actions = ['new', 'save', 'select_all']
for action in test_actions:
    if action in automation.shortcuts:
        keys = automation.shortcuts[action]
        print(f"{action}: {'+'.join(keys) if keys else 'Not supported'}")

print("\n=== Typewriter Effects Demo ===")
typewriter = TypewriterEffect(base_delay=0.001)  # Very fast for demo

# Simulate typing without actually typing
print("Demo text that would be typed with effects:")
demo_text = "Hello, this is a test of automated typing!"
print(f"📝 Natural typing: {demo_text}")
print(f"📝 With corrections: {demo_text}")
print(f"📝 Progressive speed: {demo_text}")

print("\n=== File Operations Demo ===")
file_ops = AdvancedFileOperations(automation)
print("💾 Would attempt to save file with retry logic")

print("\n=== Screen Verification Demo ===")
screen_verify = ScreenVerification()
# Simulate window detection
print("🔍 Simulating window detection...")

# Automation workflow example
class AutomationWorkflow:
    """Complete automation workflow orchestrator"""
    
    def __init__(self):
        self.automation = CrossPlatformAutomation()
        self.typewriter = TypewriterEffect()
        self.file_ops = AdvancedFileOperations(self.automation)
        self.screen = ScreenVerification()
    
    def execute_blog_post_workflow(self, post_data: dict, output_path: str) -> bool:
        """Execute complete blog post creation workflow"""
        try:
            print(f"🚀 Starting workflow for post {post_data['id']}")
            
            # 1. Launch text editor
            if not self.automation.launch_text_editor():
                return False
            
            # 2. Create new document
            self.automation.execute_shortcut('new')
            time.sleep(1)
            
            # 3. Type content with natural timing
            content = self._format_blog_content(post_data)
            self.typewriter.type_naturally(content)
            
            # 4. Save document
            if not self.file_ops.save_with_retry(output_path):
                return False
            
            # 5. Close editor
            self.automation.execute_shortcut('close')
            
            print(f"✅ Workflow completed for post {post_data['id']}")
            return True
            
        except Exception as e:
            print(f"❌ Workflow failed: {e}")
            return False
    
    def _format_blog_content(self, post_data: dict) -> str:
        """Format post data as blog content"""
        return f"""BLOG POST #{post_data['id']}

TITLE: {post_data['title'].title()}

CONTENT:
{post_data['body']}

---
Post ID: {post_data['id']}
User ID: {post_data['userId']}
Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
"""

print("\n=== Complete Workflow Demo ===")
workflow = AutomationWorkflow()

# Sample post data
sample_post = {
    'id': 1,
    'title': 'sample blog post',
    'body': 'This is the content of the blog post.',
    'userId': 1
}

print("🔧 Workflow configured and ready")
print("💡 In real execution, this would:")
print("  1. Launch text editor")
print("  2. Create new document")
print("  3. Type formatted content")
print("  4. Save to specified path")
print("  5. Close editor")

# Error handling patterns for automation
class AutomationRecovery:
    """Handle and recover from automation errors"""
    
    def __init__(self, workflow: AutomationWorkflow):
        self.workflow = workflow
        self.recovery_strategies = {
            'save_failed': self._recover_save_failure,
            'app_crash': self._recover_app_crash,
            'dialog_stuck': self._recover_dialog_stuck
        }
    
    def _recover_save_failure(self):
        """Recover from save operation failure"""
        print("🔧 Recovering from save failure...")
        # Try alternative save method
        self.workflow.automation.execute_shortcut('save_as')
        time.sleep(2)
        return True
    
    def _recover_app_crash(self):
        """Recover from application crash"""
        print("🔧 Recovering from app crash...")
        # Relaunch application
        return self.workflow.automation.launch_text_editor()
    
    def _recover_dialog_stuck(self):
        """Recover from stuck dialog"""
        print("🔧 Recovering from stuck dialog...")
        # Press escape to close dialogs
        pyautogui.press('escape')
        time.sleep(1)
        return True

print("\n=== Error Recovery Demo ===")
recovery = AutomationRecovery(workflow)
print("🛠️ Recovery strategies available:")
for strategy in recovery.recovery_strategies:
    print(f"  - {strategy}")

print("\n✅ Desktop automation concepts covered successfully!")


In [None]:
# Advanced Python concepts for sophisticated automation

# 1. DECORATORS - Essential for automation frameworks
from functools import wraps, lru_cache
import time
from typing import Any, Callable, TypeVar, ParamSpec

F = TypeVar('F', bound=Callable[..., Any])
P = ParamSpec('P')
T = TypeVar('T')

# Performance monitoring decorator
def performance_monitor(func: F) -> F:
    """Monitor function execution time and calls"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.perf_counter()
        result = func(*args, **kwargs)
        end_time = time.perf_counter()
        execution_time = end_time - start_time
        
        print(f"⏱️ {func.__name__} executed in {execution_time:.4f} seconds")
        return result
    return wrapper

# Retry decorator with exponential backoff
def retry_with_backoff(max_retries: int = 3, base_delay: float = 1.0, backoff_factor: float = 2.0):
    """Retry function with exponential backoff"""
    def decorator(func: F) -> F:
        @wraps(func)
        def wrapper(*args, **kwargs):
            last_exception = None
            
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    last_exception = e
                    if attempt < max_retries - 1:
                        delay = base_delay * (backoff_factor ** attempt)
                        print(f"🔄 Retry {attempt + 1}/{max_retries} after {delay:.1f}s: {e}")
                        time.sleep(delay)
                    
            print(f"❌ All {max_retries} attempts failed")
            raise last_exception
        return wrapper
    return decorator

# Caching decorator with TTL
def timed_cache(ttl_seconds: int = 300):
    """Cache with time-to-live"""
    def decorator(func: F) -> F:
        cache = {}
        cache_times = {}
        
        @wraps(func)
        def wrapper(*args, **kwargs):
            # Create cache key
            key = str(args) + str(sorted(kwargs.items()))
            current_time = time.time()
            
            # Check if cached and not expired
            if key in cache and (current_time - cache_times[key]) < ttl_seconds:
                print(f"🎯 Cache hit for {func.__name__}")
                return cache[key]
            
            # Execute function and cache result
            result = func(*args, **kwargs)
            cache[key] = result
            cache_times[key] = current_time
            print(f"💾 Cached result for {func.__name__}")
            return result
        return wrapper
    return decorator

# Validation decorator
def validate_types(**type_checks):
    """Validate function argument types"""
    def decorator(func: F) -> F:
        @wraps(func)
        def wrapper(*args, **kwargs):
            # Get function signature
            import inspect
            sig = inspect.signature(func)
            bound_args = sig.bind(*args, **kwargs)
            bound_args.apply_defaults()
            
            # Check types
            for param_name, expected_type in type_checks.items():
                if param_name in bound_args.arguments:
                    value = bound_args.arguments[param_name]
                    if not isinstance(value, expected_type):
                        raise TypeError(f"Parameter '{param_name}' must be {expected_type.__name__}, got {type(value).__name__}")
            
            return func(*args, **kwargs)
        return wrapper
    return decorator

# Class decorator for automation
def automation_component(name: str):
    """Mark class as automation component"""
    def decorator(cls):
        cls._automation_name = name
        cls._is_automation_component = True
        
        # Add logging to all methods
        for attr_name in dir(cls):
            attr = getattr(cls, attr_name)
            if callable(attr) and not attr_name.startswith('_'):
                setattr(cls, attr_name, performance_monitor(attr))
        
        return cls
    return decorator

# Demo decorator usage
@performance_monitor
@retry_with_backoff(max_retries=2, base_delay=0.1)
@timed_cache(ttl_seconds=10)
def fetch_api_data(url: str) -> dict:
    """Simulate API call with decorators"""
    print(f"🌐 Fetching data from {url}")
    import random
    if random.random() < 0.3:  # 30% chance of failure
        raise ConnectionError("Simulated network error")
    return {"data": f"Content from {url}", "timestamp": time.time()}

@validate_types(post_id=int, content=str)
def process_blog_post(post_id: int, content: str) -> bool:
    """Process blog post with type validation"""
    print(f"📝 Processing post {post_id}: {content[:30]}...")
    return True

# 2. GENERATORS - Memory efficient data processing
def blog_post_generator(api_client, batch_size: int = 5):
    """Generate blog posts in batches"""
    offset = 0
    while True:
        batch = api_client.get_posts(limit=batch_size, offset=offset)
        if not batch:
            break
        for post in batch:
            yield post
        offset += batch_size

def file_processor_generator(directory: Path, pattern: str = "*.txt"):
    """Generate file contents one at a time"""
    for file_path in directory.glob(pattern):
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                content = f.read()
            yield file_path, content
        except Exception as e:
            print(f"❌ Error reading {file_path}: {e}")

def chunked_processor(items, chunk_size: int = 10):
    """Process items in chunks"""
    for i in range(0, len(items), chunk_size):
        chunk = items[i:i + chunk_size]
        yield chunk

# Generator expressions and pipeline
def process_posts_pipeline(posts):
    """Process posts using generator pipeline"""
    # Filter valid posts
    valid_posts = (post for post in posts if 'title' in post and 'body' in post)
    
    # Transform titles
    formatted_posts = (
        {**post, 'title': post['title'].title(), 'word_count': len(post['body'].split())}
        for post in valid_posts
    )
    
    # Filter by word count
    substantial_posts = (post for post in formatted_posts if post['word_count'] > 10)
    
    return substantial_posts

# 3. CONTEXT MANAGERS - Resource management
from contextlib import contextmanager, ExitStack

class AutomationSession:
    """Context manager for automation sessions"""
    
    def __init__(self, session_name: str):
        self.session_name = session_name
        self.start_time = None
        self.resources = []
    
    def __enter__(self):
        self.start_time = time.time()
        print(f"🚀 Starting automation session: {self.session_name}")
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        duration = time.time() - self.start_time
        if exc_type:
            print(f"❌ Session '{self.session_name}' failed after {duration:.2f}s: {exc_val}")
        else:
            print(f"✅ Session '{self.session_name}' completed in {duration:.2f}s")
        
        # Cleanup resources
        for resource in self.resources:
            try:
                resource.close()
            except:
                pass
    
    def add_resource(self, resource):
        """Add resource for automatic cleanup"""
        self.resources.append(resource)

@contextmanager
def temporary_directory():
    """Create and cleanup temporary directory"""
    import tempfile
    import shutil
    
    temp_dir = Path(tempfile.mkdtemp())
    try:
        print(f"📁 Created temp directory: {temp_dir}")
        yield temp_dir
    finally:
        shutil.rmtree(temp_dir, ignore_errors=True)
        print(f"🗑️ Cleaned up temp directory: {temp_dir}")

@contextmanager
def error_logging(operation_name: str):
    """Context manager for error logging"""
    try:
        print(f"🔄 Starting: {operation_name}")
        yield
        print(f"✅ Completed: {operation_name}")
    except Exception as e:
        print(f"❌ Failed: {operation_name} - {e}")
        raise

# 4. METACLASSES AND DESCRIPTORS
class AutoProperty:
    """Descriptor for automatic property validation"""
    
    def __init__(self, name: str, validator: Callable = None):
        self.name = name
        self.validator = validator
        self.private_name = f'_{name}'
    
    def __get__(self, obj, objtype=None):
        if obj is None:
            return self
        return getattr(obj, self.private_name, None)
    
    def __set__(self, obj, value):
        if self.validator:
            value = self.validator(value)
        setattr(obj, self.private_name, value)
    
    def __delete__(self, obj):
        delattr(obj, self.private_name)

def positive_number(value):
    """Validator for positive numbers"""
    if not isinstance(value, (int, float)) or value <= 0:
        raise ValueError("Must be a positive number")
    return value

def non_empty_string(value):
    """Validator for non-empty strings"""
    if not isinstance(value, str) or not value.strip():
        raise ValueError("Must be a non-empty string")
    return value.strip()

class AutomationConfigMeta(type):
    """Metaclass for automation configuration classes"""
    
    def __new__(mcs, name, bases, namespace):
        # Add automatic __init__ method
        def __init__(self, **kwargs):
            for key, value in kwargs.items():
                if hasattr(self, key):
                    setattr(self, key, value)
        
        namespace['__init__'] = __init__
        
        # Add validation method
        def validate(self):
            errors = []
            for attr_name in dir(self):
                attr = getattr(type(self), attr_name, None)
                if isinstance(attr, AutoProperty):
                    try:
                        value = getattr(self, attr_name)
                        if value is None:
                            errors.append(f"Required property '{attr_name}' is missing")
                    except Exception as e:
                        errors.append(f"Property '{attr_name}' validation failed: {e}")
            
            if errors:
                raise ValueError("Configuration validation failed: " + "; ".join(errors))
            return True
        
        namespace['validate'] = validate
        return super().__new__(mcs, name, bases, namespace)

class BotConfiguration(metaclass=AutomationConfigMeta):
    """Automation bot configuration with automatic validation"""
    
    typing_speed = AutoProperty("typing_speed", positive_number)
    save_timeout = AutoProperty("save_timeout", positive_number)
    output_directory = AutoProperty("output_directory", non_empty_string)
    retry_attempts = AutoProperty("retry_attempts", lambda x: max(1, int(x)))

# 5. ADVANCED FUNCTION FEATURES
def partial_application_demo():
    """Demonstrate partial application and currying"""
    from functools import partial
    
    def save_file(directory: str, filename: str, content: str, encoding: str = 'utf-8'):
        filepath = Path(directory) / filename
        with open(filepath, 'w', encoding=encoding) as f:
            f.write(content)
        return filepath
    
    # Create specialized functions
    save_to_output = partial(save_file, "/tmp/output")
    save_utf8_to_output = partial(save_to_output, encoding='utf-8')
    
    return save_to_output, save_utf8_to_output

# Demo usage
print("=== Decorator Demo ===")
try:
    result = fetch_api_data("https://api.example.com/posts")
    print(f"📄 Result: {result}")
except Exception as e:
    print(f"❌ Final error: {e}")

# Type validation demo
try:
    process_blog_post(123, "Valid content")
    process_blog_post("invalid", "Content")  # This will fail
except TypeError as e:
    print(f"❌ Type validation error: {e}")

print("\n=== Generator Demo ===")
# Simulate posts for generator demo
sample_posts = [
    {"id": 1, "title": "first post", "body": "Short content"},
    {"id": 2, "title": "second post", "body": "This is a much longer post with substantial content"},
    {"id": 3, "title": "third post", "body": "Another post with good length and meaningful content"}
]

processed = list(process_posts_pipeline(sample_posts))
for post in processed:
    print(f"📝 {post['title']}: {post['word_count']} words")

# Chunked processing demo
chunks = list(chunked_processor(range(25), chunk_size=7))
print(f"🔢 Processed {len(chunks)} chunks: {[len(chunk) for chunk in chunks]}")

print("\n=== Context Manager Demo ===")
with AutomationSession("Demo Session") as session:
    with temporary_directory() as temp_dir:
        with error_logging("File Creation"):
            test_file = temp_dir / "test.txt"
            test_file.write_text("Demo content")
            print(f"📄 Created: {test_file}")

print("\n=== Metaclass Demo ===")
try:
    config = BotConfiguration(
        typing_speed=0.01,
        save_timeout=5,
        output_directory="/tmp/output",
        retry_attempts=3
    )
    config.validate()
    print("✅ Configuration valid")
    print(f"Typing speed: {config.typing_speed}")
except ValueError as e:
    print(f"❌ Configuration error: {e}")

# Invalid configuration demo
try:
    bad_config = BotConfiguration(typing_speed=-1)  # Invalid negative speed
    bad_config.validate()
except ValueError as e:
    print(f"❌ Expected validation error: {e}")

print("\n=== Partial Application Demo ===")
save_to_output, save_utf8_to_output = partial_application_demo()
print("🔧 Created specialized save functions")

# Advanced iteration patterns
def advanced_iteration_patterns():
    """Demonstrate advanced iteration techniques"""
    from itertools import chain, groupby, islice, cycle, combinations
    
    # Chain multiple iterables
    list1 = [1, 2, 3]
    list2 = [4, 5, 6]
    list3 = [7, 8, 9]
    chained = list(chain(list1, list2, list3))
    print(f"🔗 Chained: {chained}")
    
    # Group by key
    posts = [
        {"userId": 1, "title": "Post A"},
        {"userId": 1, "title": "Post B"},
        {"userId": 2, "title": "Post C"},
        {"userId": 2, "title": "Post D"},
    ]
    grouped = groupby(posts, key=lambda x: x["userId"])
    for user_id, user_posts in grouped:
        post_titles = [p["title"] for p in user_posts]
        print(f"👤 User {user_id}: {post_titles}")
    
    # Take first N items
    infinite_counter = cycle([1, 2, 3])
    first_10 = list(islice(infinite_counter, 10))
    print(f"🔄 First 10 from cycle: {first_10}")
    
    # Combinations
    items = ['A', 'B', 'C', 'D']
    pairs = list(combinations(items, 2))
    print(f"🔀 Combinations of 2: {pairs}")

print("\n=== Advanced Iteration Demo ===")
advanced_iteration_patterns()

# Memory efficient data processing
def memory_efficient_processing():
    """Demonstrate memory-efficient techniques"""
    
    # Generator expression vs list comprehension
    import sys
    
    # Memory usage comparison (conceptual)
    numbers = range(1000000)
    
    # Generator expression - lazy evaluation
    squares_gen = (x**2 for x in numbers if x % 2 == 0)
    print(f"🧠 Generator created (minimal memory)")
    
    # Process first 5 items only
    first_5_squares = list(islice(squares_gen, 5))
    print(f"🔢 First 5 even squares: {first_5_squares}")
    
    # Demonstrate __slots__ for memory efficiency
    class EfficientPost:
        __slots__ = ['id', 'title', 'body', 'user_id']
        
        def __init__(self, id, title, body, user_id):
            self.id = id
            self.title = title
            self.body = body
            self.user_id = user_id
    
    efficient_post = EfficientPost(1, "Test", "Content", 1)
    print(f"💾 Efficient post created with __slots__")

print("\n=== Memory Efficiency Demo ===")
memory_efficient_processing()

print("\n✅ Advanced Python features demonstration complete!")
