In [1]:
import matplotlib.pyplot as plt
%matplotlib inline
# Adaptive Trade-off Model (ATM) Implementation for Federated Distributed Environment
# Jupyter Notebook Version

import time
import numpy as np
import pandas as pd
from typing import Dict, List, Tuple, Any
import threading
import queue
import logging
import json
from concurrent.futures import ThreadPoolExecutor
from IPython.display import display, clear_output
import matplotlib.pyplot as plt
import seaborn as sns

# Configure logging for Jupyter
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("ATM_Federated")

# Create a handler that will store logs for display
class NotebookLogHandler(logging.Handler):
    def __init__(self, max_logs=100):
        super().__init__()
        self.logs = []
        self.max_logs = max_logs
        
    def emit(self, record):
        log_entry = self.format(record)
        self.logs.append(log_entry)
        if len(self.logs) > self.max_logs:
            self.logs.pop(0)

notebook_handler = NotebookLogHandler()
logger.addHandler(notebook_handler)

In [2]:
# Display function for logs
def display_logs():
    clear_output(wait=True)
    for log in notebook_handler.logs:
        print(log)

In [3]:
class MetricsCollector:
    """Simulates Prometheus metrics collection in a distributed environment"""
    
    def __init__(self, node_id: str):
        self.node_id = node_id
        self.metrics = {}
        
    def collect_node_metrics(self) -> Dict[str, Any]:
        """Collect system metrics for this node"""
        # In a real system, this would interface with Prometheus API
        # Simulating metrics collection
        self.metrics = {
            "cpu_utilization": np.random.uniform(0.3, 0.95),  # 30-95%
            "memory_utilization": np.random.uniform(0.4, 0.8),  # 40-80%
            "network_latency_ms": np.random.uniform(5, 200),  # 5-200ms
            "network_throughput_mbps": np.random.uniform(50, 1000),  # 50-1000 Mbps
            "disk_io_utilization": np.random.uniform(0.1, 0.7),  # 10-70%
            "timestamp": time.time(),
            "node_id": self.node_id
        }
        return self.metrics

In [4]:
class KafkaProducer:
    """Simulates Kafka producer for sending metrics and events"""
    
    def __init__(self, broker_address: str = "kafka:9092"):
        self.broker_address = broker_address
        self.message_count = 0
        print(f"Initialized Kafka producer connecting to {broker_address}")
        
    def send_message(self, topic: str, message: Dict) -> bool:
        """Send a message to a Kafka topic"""
        # In a real system, this would use kafka-python or similar
        print(f"Sending to topic {topic}: {message}")
        self.message_count += 1
        return True

In [5]:
class KafkaProducer:
    """Simulates Kafka producer for sending metrics and events"""
    
    def __init__(self, broker_address: str = "kafka:9092"):
        self.broker_address = broker_address
        self.message_count = 0
        print(f"Initialized Kafka producer connecting to {broker_address}")
        
    def send_message(self, topic: str, message: Dict) -> bool:
        """Send a message to a Kafka topic"""
        # In a real system, this would use kafka-python or similar
        print(f"Sending to topic {topic}: {message}")
        self.message_count += 1
        return True



In [6]:
class KafkaConsumer:
    """Simulates Kafka consumer for receiving metrics and commands"""
    
    def __init__(self, topics: List[str], group_id: str, broker_address: str = "kafka:9092"):
        self.topics = topics
        self.group_id = group_id
        self.broker_address = broker_address
        self.message_queue = queue.Queue()
        self.running = True
        print(f"Initialized Kafka consumer for topics {topics}, group {group_id}")
        
        # Start simulated message consumption thread
        self.consumer_thread = threading.Thread(target=self._consume_messages)
        self.consumer_thread.daemon = True
        self.consumer_thread.start()
        
    def _consume_messages(self):
        """Background thread that simulates message consumption"""
        while self.running:
            # In a real system, this would poll Kafka topics
            time.sleep(1)
            # Simulate receiving random messages
            if np.random.random() < 0.7:  # 70% chance of receiving a message each second
                self.message_queue.put({
                    "type": np.random.choice(["metrics", "command"]),
                    "content": {
                        "value": np.random.random(),
                        "timestamp": time.time()
                    }
                })
    
    def poll(self, timeout_ms: int = 100) -> List[Dict]:
        """Poll for new messages"""
        messages = []
        try:
            while not self.message_queue.empty():
                messages.append(self.message_queue.get(block=False))
        except queue.Empty:
            pass
        return messages
    
    def close(self):
        """Stop the consumer"""
        self.running = False
        if self.consumer_thread.is_alive():
            self.consumer_thread.join(timeout=2.0)


In [7]:
class ReinforcementLearningEngine:
    """Decision engine using Reinforcement Learning to optimize workload distribution"""
    
    def __init__(self):
        # Initialize Q-learning parameters
        self.states = []  # States represent different system conditions
        self.actions = ["balanced", "distribution_heavy", "local_heavy"]
        self.q_table = {}  # Q-table for storing state-action values
        self.learning_rate = 0.1
        self.discount_factor = 0.9
        self.exploration_rate = 0.2
        self.exploration_decay = 0.995
        
    def _get_state_key(self, metrics: Dict[str, Any]) -> str:
        """Convert system metrics to a discrete state representation"""
        # Discretize continuous metrics into state bins
        cpu_bin = int(metrics["cpu_utilization"] * 10)
        mem_bin = int(metrics["memory_utilization"] * 10)
        latency_bin = min(9, int(metrics["network_latency_ms"] / 20))
        throughput_bin = min(9, int(metrics["network_throughput_mbps"] / 100))
        
        return f"{cpu_bin}_{mem_bin}_{latency_bin}_{throughput_bin}"
    
    def decide_strategy(self, metrics: Dict[str, Any]) -> str:
        """Determine the optimal strategy based on current metrics"""
        state_key = self._get_state_key(metrics)
        
        # Add state to Q-table if not seen before
        if state_key not in self.q_table:
            self.q_table[state_key] = {action: 0.0 for action in self.actions}
        
        # Exploration vs. exploitation
        if np.random.random() < self.exploration_rate:
            # Explore: choose random action
            chosen_action = np.random.choice(self.actions)
            print(f"Exploring with action: {chosen_action}")
        else:
            # Exploit: choose best action from Q-table
            chosen_action = max(self.q_table[state_key], key=self.q_table[state_key].get)
            print(f"Exploiting with action: {chosen_action}")
        
        # Decay exploration rate
        self.exploration_rate *= self.exploration_decay
        
        return chosen_action
    
    def update_q_table(self, prev_state: str, action: str, reward: float, new_state: str) -> None:
        """Update Q-values based on reward and new state"""
        if new_state not in self.q_table:
            self.q_table[new_state] = {act: 0.0 for act in self.actions}
            
        # Q-learning update rule
        # Q(s,a) = Q(s,a) + α * [r + γ * max(Q(s',a')) - Q(s,a)]
        old_value = self.q_table[prev_state][action]
        next_max = max(self.q_table[new_state].values())
        
        # Calculate new Q-value
        new_value = old_value + self.learning_rate * (
            reward + self.discount_factor * next_max - old_value
        )
        
        # Update Q-table
        self.q_table[prev_state][action] = new_value
        print(f"Updated Q-value for {prev_state}, {action}: {old_value} -> {new_value}")


In [8]:
def update_q_table(self, prev_state: str, action: str, reward: float, new_state: str) -> None:
        """Update Q-values based on reward and new state"""
        if new_state not in self.q_table:
            self.q_table[new_state] = {act: 0.0 for act in self.actions}
            
        # Q-learning update rule
        # Q(s,a) = Q(s,a) + α * [r + γ * max(Q(s',a')) - Q(s,a)]
        old_value = self.q_table[prev_state][action]
        next_max = max(self.q_table[new_state].values())
        
        # Calculate new Q-value
        new_value = old_value + self.learning_rate * (
            reward + self.discount_factor * next_max - old_value
        )
        
        # Update Q-table
        self.q_table[prev_state][action] = new_value
        print(f"Updated Q-value for {prev_state}, {action}: {old_value} -> {new_value}")

In [9]:
class ExecutionController:
    """Controls the distributed execution strategy based on decisions"""
    
    def __init__(self, node_id: str):
        self.node_id = node_id
        self.current_mode = "balanced"  # Default mode
        self.mode_params = {
            "balanced": {
                "local_compute_ratio": 0.5,
                "network_batch_size": 1000,
                "consistency_level": "strong"
            },
            "distribution_heavy": {
                "local_compute_ratio": 0.2,
                "network_batch_size": 5000,
                "consistency_level": "eventual"
            },
            "local_heavy": {
                "local_compute_ratio": 0.8,
                "network_batch_size": 100,
                "consistency_level": "strong"
            }
        }
        
    def apply_strategy(self, mode: str) -> Dict[str, Any]:
        """Apply the specified execution strategy"""
        if mode not in self.mode_params:
            print(f"Unknown mode: {mode}, keeping current mode")
            return
        
        self.current_mode = mode
        params = self.mode_params[mode]
        
        print(f"Node {self.node_id} switching to {mode} mode with params: {params}")
        
        # Here we would actually adjust system parameters
        # For simulation, we'll just return the parameters
        return params
    
    def get_current_config(self) -> Dict[str, Any]:
        """Get current configuration parameters"""
        return {
            "mode": self.current_mode,
            "params": self.mode_params[self.current_mode],
            "node_id": self.node_id
        }

In [10]:
class FeedbackCollector:
    """Collects performance metrics after applying a strategy for feedback"""
    
    def __init__(self, node_id: str):
        self.node_id = node_id
        self.baseline_metrics = {}
        self.current_metrics = {}
        self.performance_history = []
        
    def set_baseline(self, metrics: Dict[str, Any]) -> None:
        """Set baseline metrics for comparison"""
        self.baseline_metrics = metrics.copy()
        
    def collect_current_metrics(self) -> Dict[str, Any]:
        """Collect current performance metrics"""
        # In real system, would interface with monitoring system
        # Simulating metrics collection
        self.current_metrics = {
            "response_time_ms": np.random.uniform(10, 500),
            "throughput_qps": np.random.uniform(100, 10000),
            "error_rate": np.random.uniform(0, 0.05),
            "timestamp": time.time(),
            "node_id": self.node_id
        }
        return self.current_metrics
    
    def calculate_reward(self) -> float:
        """Calculate reward based on performance improvement"""
        # No baseline yet
        if not self.baseline_metrics:
            return 0.0
            
        # Simple reward function based on response time improvement
        # Could be much more sophisticated in real system
        baseline_rt = self.baseline_metrics.get("response_time_ms", 0)
        current_rt = self.current_metrics.get("response_time_ms", 0)
        
        if baseline_rt == 0:
            return 0.0
            
        # Improvement ratio: lower response time is better
        improvement = (baseline_rt - current_rt) / baseline_rt
        
        # Bound the reward
        reward = min(5.0, max(-5.0, improvement * 10))
        
        # Record performance
        self.performance_history.append({
            "timestamp": time.time(),
            "baseline_rt": baseline_rt,
            "current_rt": current_rt,
            "improvement": improvement,
            "reward": reward
        })
        
        return reward
    
    


In [11]:
class FederatedNode:
    """Represents a node in the federated distributed system"""
    
    def __init__(self, node_id: str, coordinator_address: str = None):
        self.node_id = node_id
        self.coordinator_address = coordinator_address
        self.is_coordinator = coordinator_address is None
        
        # Initialize components
        self.metrics_collector = MetricsCollector(node_id)
        self.kafka_producer = KafkaProducer()
        self.kafka_consumer = KafkaConsumer(
            topics=["atm_commands", "atm_metrics"],
            group_id=f"node_{node_id}"
        )
        self.execution_controller = ExecutionController(node_id)
        self.feedback_collector = FeedbackCollector(node_id)
        
        # Only coordinator nodes have decision engine
        self.decision_engine = ReinforcementLearningEngine() if self.is_coordinator else None
        
        # State tracking
        self.prev_state = None
        self.current_action = "balanced"
        self.running = True
        
        # Performance tracking
        self.strategy_history = []
        self.metrics_history = []
        
        print(f"Initialized {'coordinator' if self.is_coordinator else 'worker'} node {node_id}")
    
    def start(self):
        """Start the node operation"""
        print(f"Starting node {self.node_id}")
        
        try:
            if self.is_coordinator:
                self._run_coordinator()
            else:
                self._run_worker()
        except KeyboardInterrupt:
            print("Shutdown signal received")
        finally:
            self.shutdown()
    
    def _run_coordinator(self):
        """Run the coordinator node logic"""
        print(f"Running as coordinator node {self.node_id}")
        
        while self.running:
            # 1. Collect local metrics
            local_metrics = self.metrics_collector.collect_node_metrics()
            self.metrics_history.append(local_metrics)
            
            # 2. Collect metrics from all worker nodes via Kafka
            cluster_metrics = self._aggregate_cluster_metrics()
            
            # 3. Make optimization decision
            if cluster_metrics:
                state_key = self.decision_engine._get_state_key(local_metrics)
                new_action = self.decision_engine.decide_strategy(local_metrics)
                
                # Track strategy changes
                self.strategy_history.append({
                    "timestamp": time.time(),
                    "strategy": new_action,
                    "state": state_key
                })
                
                # 4. Distribute decision to all nodes
                command = {
                    "type": "strategy_change",
                    "mode": new_action,
                    "timestamp": time.time(),
                    "source_node": self.node_id
                }
                self.kafka_producer.send_message("atm_commands", command)
                
                # 5. Apply strategy locally too
                self.execution_controller.apply_strategy(new_action)
                
                # 6. Store current state/action for later update
                self.prev_state = state_key
                self.current_action = new_action
                
                # 7. Set baseline for feedback
                self.feedback_collector.set_baseline(local_metrics)
            
            # 8. Wait for feedback cycle
            time.sleep(10)  # Adjust as needed
            
            # 9. Collect feedback and update model
            current_metrics = self.feedback_collector.collect_current_metrics()
            reward = self.feedback_collector.calculate_reward()
            
            if self.prev_state:
                new_state = self.decision_engine._get_state_key(
                    self.metrics_collector.collect_node_metrics()
                )
                self.decision_engine.update_q_table(
                    self.prev_state, self.current_action, reward, new_state
                )
                print(f"Updated model with reward {reward:.2f} for action {self.current_action}")
    
    def _run_worker(self):
        """Run the worker node logic"""
        print(f"Running as worker node {self.node_id}")
        
        while self.running:
            # 1. Collect and publish metrics
            metrics = self.metrics_collector.collect_node_metrics()
            self.metrics_history.append(metrics)
            self.kafka_producer.send_message("atm_metrics", metrics)
            
            # 2. Check for commands from coordinator
            messages = self.kafka_consumer.poll()
            for message in messages:
                if message.get("type") == "command" and "content" in message:
                    command = message["content"]
                    if command.get("type") == "strategy_change":
                        # Apply requested strategy
                        mode = command.get("mode", "balanced")
                        self.execution_controller.apply_strategy(mode)
                        
                        # Track strategy changes
                        self.strategy_history.append({
                            "timestamp": time.time(),
                            "strategy": mode,
                            "source": "coordinator"
                        })
                        
                        print(f"Applied strategy {mode} based on coordinator command")
                
            # 3. Collect feedback for local performance monitoring
            self.feedback_collector.collect_current_metrics()
            
            # 4. Wait before next cycle
            time.sleep(5)  # Adjust as needed
    
    def _aggregate_cluster_metrics(self) -> Dict[str, List[Dict[str, Any]]]:
        """Aggregate metrics from all nodes in the cluster"""
        # In a real system, this would collect and process Kafka metrics
        # For simulation, we'll return some dummy cluster data
        return {
            "nodes": [
                {"node_id": f"node_{i}", 
                 "cpu": np.random.uniform(0.3, 0.9),
                 "memory": np.random.uniform(0.4, 0.8),
                 "network_latency": np.random.uniform(5, 200)
                } for i in range(1, 6)  # Simulate 5 nodes
            ]
        }
    
    def shutdown(self):
        """Shutdown the node gracefully"""
        print(f"Shutting down node {self.node_id}")
        self.running = False
        self.kafka_consumer.close()
        # Additional cleanup as needed

In [12]:
class ATMCluster:
    """Manages a cluster of ATM nodes"""
    
    def __init__(self, num_nodes: int = 5):
        self.num_nodes = num_nodes
        self.nodes = []
        
    def start_cluster(self):
        """Start the ATM cluster with one coordinator and multiple workers"""
        print(f"Starting ATM cluster with {self.num_nodes} nodes")
        
        # Create coordinator node
        coordinator = FederatedNode(node_id="coordinator")
        self.nodes.append(coordinator)
        
        # Create worker nodes
        for i in range(1, self.num_nodes):
            worker = FederatedNode(
                node_id=f"worker_{i}",
                coordinator_address="coordinator:8080"  # In real system, use actual address
            )
            self.nodes.append(worker)
        
        # Start all nodes in separate threads
        with ThreadPoolExecutor(max_workers=self.num_nodes) as executor:
            futures = [executor.submit(node.start) for node in self.nodes]
            
            # Wait for completion or handle exceptions
            for future in futures:
                future.result()
    
    def shutdown_cluster(self):
        """Shutdown all nodes in the cluster"""
        print("Shutting down ATM cluster")
        for node in self.nodes:
            node.shutdown()
    
   

In [None]:
def main():
    """Main entry point for the ATM system"""
    print("Starting Adaptive Trade-off Model (ATM) in federated mode")
    
    # Create and start the cluster
    cluster = ATMCluster(num_nodes=5)
    
    try:
        cluster.start_cluster()
        
        # Run for a fixed time period
        time.sleep(60)
        
        
    except KeyboardInterrupt:
        print("Shutdown signal received")
    finally:
        cluster.shutdown_cluster()
        print("ATM system shutdown complete")

if __name__ == "__main__":
    main()

Starting Adaptive Trade-off Model (ATM) in federated mode
Starting ATM cluster with 5 nodes
Initialized Kafka producer connecting to kafka:9092
Initialized Kafka consumer for topics ['atm_commands', 'atm_metrics'], group node_coordinator
Initialized coordinator node coordinator
Initialized Kafka producer connecting to kafka:9092
Initialized Kafka consumer for topics ['atm_commands', 'atm_metrics'], group node_worker_1
Initialized worker node worker_1
Initialized Kafka producer connecting to kafka:9092
Initialized Kafka consumer for topics ['atm_commands', 'atm_metrics'], group node_worker_2
Initialized worker node worker_2
Initialized Kafka producer connecting to kafka:9092
Initialized Kafka consumer for topics ['atm_commands', 'atm_metrics'], group node_worker_3
Initialized worker node worker_3
Initialized Kafka producer connecting to kafka:9092
Initialized Kafka consumer for topics ['atm_commands', 'atm_metrics'], group node_worker_4
Initialized worker node worker_4
Starting node coo