Used for testing both HiSCroll 12 inside Pump Locker

# Dual Vacuum Pump Control System

This notebook provides control and monitoring capabilities for two vacuum pumps with pressure sensors.

## Features:
- Control for 2 HiScroll12 vacuum pumps
- Monitoring for 2 TPG366 pressure sensors
- Comprehensive logging using loguru
- Cancelable continuous monitoring
- Real-time data visualization

## 1. Import Required Libraries and Setup

In [1]:
import sys
import threading
import time
from datetime import datetime
import asyncio
from typing import Dict, Optional, Any
import pandas as pd
import matplotlib.pyplot as plt
from IPython.display import display, clear_output
import ipywidgets as widgets


from loguru import logger
import os

# Add path to custom modules
sys.path.append(r'C:\Users\ESIBDlab\PycharmProjects\esibd_pfeiffervacuum')

# Import pump and sensor modules
from scrollpump import HiScroll12
from pressure_readout import TPG366

## 2. Install and Setup Logging with Loguru

In [2]:
# Get the current working directory (repository root)
repo_root = os.getcwd()

# Create logs directory within the repository if it doesn't exist
log_dir = os.path.join(repo_root, "logs")
if not os.path.exists(log_dir):
    os.makedirs(log_dir)
    print(f"Created logs directory: {log_dir}")

# Configure logger with rotation and retention
log_file = os.path.join(log_dir, "pump_control_{time:YYYY-MM-DD}.log")

# Remove default logger
logger.remove()

# Add console logger with INFO level
logger.add(sys.stderr, level="INFO", format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}")

# Add file logger with DEBUG level
logger.add(
    log_file,
    level="DEBUG",
    format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level} | {name}:{function}:{line} | {message}",
    rotation="1 day",
    retention="30 days",
    compression="zip"
)

logger.info("Dual pump control system initialized")
print(f"Repository root: {repo_root}")
print(f"Logs will be saved to: {os.path.abspath(log_file)}")

2025-07-14 10:49:02 | INFO | Dual pump control system initialized


Repository root: C:\Users\ESIBDlab\PycharmProjects\pump_locker_test
Logs will be saved to: C:\Users\ESIBDlab\PycharmProjects\pump_locker_test\logs\pump_control_{time:YYYY-MM-DD}.log


## 3. Pump Control Class Definition

In [3]:
class DualPumpController:
    """Controller class for managing two vacuum pumps and their pressure sensors"""
    
    def __init__(self):
        self.pump1: Optional[HiScroll12] = None
        self.pump2: Optional[HiScroll12] = None
        self.pressure_controller: Optional[TPG366] = None  # Single controller for both sensors
        
        # Monitoring control
        self.monitoring_active = False
        self.monitoring_thread = None
        self.monitoring_stop_event = threading.Event()
        
        # Data storage
        self.monitoring_data = []
        
        logger.info("DualPumpController initialized")
    
    def connect_pumps(self, pump1_com: str, pump1_addr: int, pump2_com: str, pump2_addr: int):
        """Connect to both vacuum pumps"""
        try:
            logger.info(f"Connecting to Pump 1 on {pump1_com}, address {pump1_addr}")
            self.pump1 = HiScroll12(pump1_com, pump1_addr)
            
            logger.info(f"Connecting to Pump 2 on {pump2_com}, address {pump2_addr}")
            self.pump2 = HiScroll12(pump2_com, pump2_addr)
            
            logger.success("Both pumps connected successfully")
            return True
            
        except Exception as e:
            logger.error(f"Failed to connect pumps: {e}")
            return False
    
    def connect_pressure_controller(self, com_port: str, address: int):
        """Connect to the pressure readout controller (handles both sensors)"""
        try:
            logger.info(f"Connecting to pressure controller on {com_port}, address {address}")
            self.pressure_controller = TPG366(com_port, address)
            
            logger.success("Pressure controller connected successfully")
            return True
            
        except Exception as e:
            logger.error(f"Failed to connect pressure controller: {e}")
            return False
    
    def disconnect_all(self):
        """Safely disconnect all devices"""
        logger.info("Disconnecting all devices...")
        
        # Stop monitoring first
        self.stop_monitoring()
        
        # Disconnect pumps
        if self.pump1:
            try:
                self.pump1.close()
                logger.info("Pump 1 disconnected")
            except Exception as e:
                logger.error(f"Error disconnecting Pump 1: {e}")
        
        if self.pump2:
            try:
                self.pump2.close()
                logger.info("Pump 2 disconnected")
            except Exception as e:
                logger.error(f"Error disconnecting Pump 2: {e}")
        
        # Disconnect pressure controller
        if self.pressure_controller:
            try:
                self.pressure_controller.close()
                logger.info("Pressure controller disconnected")
            except Exception as e:
                logger.error(f"Error disconnecting pressure controller: {e}")
        
        logger.success("All devices disconnected")

# Create global controller instance
controller = DualPumpController()

2025-07-14 10:49:04 | INFO | DualPumpController initialized


## 4. Pump Monitoring Functions

In [5]:
def monitor_single_pump(pump: HiScroll12, pump_id: str) -> Dict[str, Any]:
    """
    Monitor a single pump and return its status parameters.
    
    Parameters:
    - pump: HiScroll12 pump instance
    - pump_id: Identifier string for the pump
    
    Returns:
    - Dictionary containing pump parameters
    
    TODO: Implement specific monitoring functions
    Parameters to monitor:
    - Temperature
    - Rotation speed (RPM)
    - Enable status
    - Error codes
    - Operating hours
    - Power consumption
    """
    
    try:
        logger.debug(f"Monitoring {pump_id}...")
        
        # Placeholder data structure - replace with actual pump methods
        pump_data = {
            'timestamp': datetime.now(),
            'pump_id': pump_id,
            'temp_Motor': None,  # TODO: pump.get_temperature()
            'temp_Elec': None,  # TODO: pump.get_temperature()
            'temp_PwrStg': None,  # TODO: pump.get_temperature()
            'rotation_speed_rpm': None,  # TODO: pump.get_actRotSpd_rpm()
            'enable_status': None,  # TODO: pump.get_Pmp_Enable()
            'error_codes': None,  # TODO: pump.get_error_codes()
            'operating_hours': None,  # TODO: pump.get_operating_hours()
            'power_consumption': None,  # TODO: pump.get_power_consumption()
            'status': 'OK'  # Placeholder
        }
        
        # Example of actual implementation (uncomment when ready):
        pump_data['rotation_speed_rpm'] = int(pump.get_actRotSpd_rpm())
        pump_data['enable_status'] = pump.get_Pmp_Enable()
        pump_data['power_consumption'] = int(pump.get_drvPwr())
        pump_data['temp_Motor'] = int(pump.get_TempMotor())
        pump_data['temp_Elec'] = int(pump.get_TempElec())
        pump_data['temp_PwrStg'] = int(pump.get_TempPwrStg())

        logger.info(f"{pump_id} RotSpeed    = {pump_data['rotation_speed_rpm']} // rpm")
        logger.info(f"{pump_id} Enable_Stat = {pump_data['enable_status']}")
        logger.info(f"{pump_id} Power       = {pump_data['power_consumption']} // W")
        logger.info(f"{pump_id} Temp_Motor  = {pump_data['temp_Motor']} // degC")
        logger.info(f"{pump_id} Temp_Elec   = {pump_data['temp_Elec']} // degC")
        logger.info(f"{pump_id} Temp_PwrStr = {pump_data['temp_PwrStg']} // degC")
        logger.debug(f"{pump_id} monitoring completed successfully")
        return pump_data
        
    except Exception as e:
        logger.error(f"Error monitoring {pump_id}: {e}")
        return {
            'timestamp': datetime.now(),
            'pump_id': pump_id,
            'error': str(e),
            'status': 'ERROR'
        }

def monitor_pressure(pressure_controller: TPG366, sensor_id: str, channel: int) -> Dict[str, Any]:
    """
    Monitor pressure from a specific sensor channel.
    
    Parameters:
    - pressure_controller: TPG366 pressure controller instance
    - sensor_id: Identifier string for the sensor (e.g., "Sensor_1", "Sensor_2")
    - channel: Sensor channel to read from (1 or 2)
    
    Returns:
    - Dictionary containing pressure data with float pressure value
    """
    
    try:
        logger.debug(f"Reading pressure from {sensor_id}, channel {channel}...")
        
        # Get actual pressure reading (returns float)
        pressure_value = pressure_controller.measure_p(channel)
        
        # Get sensor name for this channel
        sensor_name = pressure_controller.get_sens_name(channel)
        
        # Create data structure with actual values
        pressure_data = {
            'timestamp': datetime.now(),
            'sensor_id': sensor_id,
            'channel': channel,
            'pressure_value': pressure_value,  # Float value from measure_p()
            'sensor_name': sensor_name,  # Sensor name from get_sens_name()
            'firmware_version': None,  # Will be set once during connection
            'status': 'OK'
        }
        
        logger.debug(f"{sensor_id} pressure reading: {pressure_value} (type: {type(pressure_value).__name__})")
        return pressure_data
        
    except Exception as e:
        logger.error(f"Error reading pressure from {sensor_id}: {e}")
        return {
            'timestamp': datetime.now(),
            'sensor_id': sensor_id,
            'channel': channel,
            'pressure_value': None,
            'error': str(e),
            'status': 'ERROR'
        }

print("Monitoring functions defined successfully")

Monitoring functions defined successfully


## 5. Continuous Monitoring System

In [6]:
def continuous_monitoring_worker(controller: DualPumpController, interval: float = 5.0):
    """
    Worker function for continuous monitoring that runs in a separate thread.
    This function can be cancelled without shutting down the kernel.
    """
    logger.info(f"Starting continuous monitoring with {interval}s interval")
    
    while not controller.monitoring_stop_event.is_set():
        try:
            monitoring_cycle_data = {
                'cycle_timestamp': datetime.now(),
                'pump1_data': None,
                'pump2_data': None,
                'sensor1_data': None,
                'sensor2_data': None
            }
            
            # Monitor pumps if connected
            if controller.pump1:
                monitoring_cycle_data['pump1_data'] = monitor_single_pump(controller.pump1, "Pump_1")
            
            if controller.pump2:
                monitoring_cycle_data['pump2_data'] = monitor_single_pump(controller.pump2, "Pump_2")
            
            # Monitor pressure sensors if controller is connected
            if controller.pressure_controller:
                # Read from sensor 1 (channel 1)
                monitoring_cycle_data['sensor1_data'] = monitor_pressure(
                    controller.pressure_controller, "Sensor_1", channel=1
                )
                
                # Read from sensor 2 (channel 2) 
                monitoring_cycle_data['sensor2_data'] = monitor_pressure(
                    controller.pressure_controller, "Sensor_2", channel=2
                )
            
            # Store data
            controller.monitoring_data.append(monitoring_cycle_data)
            
            # Log summary with pressure values if available
            log_msg = f"Monitoring cycle completed at {monitoring_cycle_data['cycle_timestamp']}"
            if monitoring_cycle_data['sensor1_data'] and monitoring_cycle_data['sensor2_data']:
                p1 = monitoring_cycle_data['sensor1_data'].get('pressure_value')
                p2 = monitoring_cycle_data['sensor2_data'].get('pressure_value')
                log_msg += f" | Pressures: S1={p1}, S2={p2}"
            logger.info(log_msg)
            
            # Wait for next cycle or stop signal
            if controller.monitoring_stop_event.wait(timeout=interval):
                break  # Stop event was set
                
        except Exception as e:
            logger.error(f"Error in monitoring cycle: {e}")
            # Continue monitoring even if one cycle fails
            time.sleep(interval)
    
    logger.info("Continuous monitoring stopped")

def start_monitoring(controller: DualPumpController, interval: float = 5.0):
    """
    Start continuous monitoring in a separate thread.
    
    Parameters:
    - controller: DualPumpController instance
    - interval: Monitoring interval in seconds
    """
    if controller.monitoring_active:
        logger.warning("Monitoring is already active")
        return False
    
    # Reset stop event and clear old data
    controller.monitoring_stop_event.clear()
    controller.monitoring_data.clear()
    
    # Start monitoring thread
    controller.monitoring_thread = threading.Thread(
        target=continuous_monitoring_worker,
        args=(controller, interval),
        daemon=True  # Dies when main thread dies
    )
    
    controller.monitoring_thread.start()
    controller.monitoring_active = True
    
    logger.success(f"Continuous monitoring started with {interval}s interval")
    return True

def stop_monitoring(controller: DualPumpController):
    """
    Stop continuous monitoring gracefully without affecting device connections.
    """
    if not controller.monitoring_active:
        logger.info("Monitoring is not currently active")
        return
    
    logger.info("Stopping continuous monitoring...")
    
    # Signal the monitoring thread to stop
    controller.monitoring_stop_event.set()
    
    # Wait for thread to finish (with timeout)
    if controller.monitoring_thread and controller.monitoring_thread.is_alive():
        controller.monitoring_thread.join(timeout=10.0)
        
        if controller.monitoring_thread.is_alive():
            logger.warning("Monitoring thread did not stop gracefully")
        else:
            logger.success("Monitoring thread stopped successfully")
    
    controller.monitoring_active = False
    controller.monitoring_thread = None

# Add methods to controller class
controller.start_monitoring = lambda interval=5.0: start_monitoring(controller, interval)
controller.stop_monitoring = lambda: stop_monitoring(controller)

print("Continuous monitoring system ready")

Continuous monitoring system ready


## 6. Device Connection Setup

In [7]:
# Configuration for your devices
# Modify these values according to your setup

PUMP1_COM = 'COM19'    # Adjust to your pump 1 COM port
PUMP1_ADDR = 2         # Adjust to your pump 1 address

PUMP2_COM = 'COM21'    # Adjust to your pump 2 COM port
PUMP2_ADDR = 2         # Adjust to your pump 2 address

# Single pressure controller handles both sensors
PRESSURE_COM = 'COM22'  # Pressure controller COM port (based on your debugging notebook)
PRESSURE_ADDR = 1       # Pressure controller address (based on your debugging notebook)

print("Device configuration loaded:")
print(f"Pump 1: {PUMP1_COM}, Address {PUMP1_ADDR}")
print(f"Pump 2: {PUMP2_COM}, Address {PUMP2_ADDR}")
print(f"Pressure Controller: {PRESSURE_COM}, Address {PRESSURE_ADDR}")
print(f"  -> Sensor 1: Channel 1")
print(f"  -> Sensor 2: Channel 2")

Device configuration loaded:
Pump 1: COM19, Address 2
Pump 2: COM21, Address 2
Pressure Controller: COM22, Address 1
  -> Sensor 1: Channel 1
  -> Sensor 2: Channel 2


In [8]:
# Connect to pumps
pump_connection = controller.connect_pumps(PUMP1_COM, PUMP1_ADDR, PUMP2_COM, PUMP2_ADDR)

if pump_connection:
    print("✅ Pumps connected successfully")
else:
    print("❌ Failed to connect to pumps")

2025-07-14 10:49:16 | INFO | Connecting to Pump 1 on COM19, address 2
2025-07-14 10:49:16 | INFO | Connecting to Pump 2 on COM21, address 2
2025-07-14 10:49:16 | SUCCESS | Both pumps connected successfully


✅ Pumps connected successfully


In [9]:
# Connect to pressure controller
pressure_connection = controller.connect_pressure_controller(PRESSURE_COM, PRESSURE_ADDR)

if pressure_connection:
    print("✅ Pressure controller connected successfully")
    print("   Both sensors accessible via channels 1 and 2")
else:
    print("❌ Failed to connect to pressure controller")

2025-07-14 10:49:17 | INFO | Connecting to pressure controller on COM22, address 1
2025-07-14 10:49:17 | SUCCESS | Pressure controller connected successfully


✅ Pressure controller connected successfully
   Both sensors accessible via channels 1 and 2


## 7. Manual Testing and Device Status

In [10]:
# Test individual pump functions (similar to your debugging notebook)
if controller.pump1:
    try:
        print("Testing Pump 1:")
        # Add actual tests here similar to your debugging notebook
        sw_version = controller.pump1.get_swVers()
        print(f"  Software Version: {sw_version}")
        # 
        # enable_status = controller.pump1.get_Pmp_Enable()
        # print(f"  Pump Enable: {enable_status}")
        # 
        rotation_speed = int(controller.pump1.get_actRotSpd_rpm())
        print(f"  Rotation Speed: {rotation_speed} RPM")

        elecSN = controller.pump1.get_serial_nrr()
        print(f"  Electric Serial Number: {elecSN}")
        
        print("  Pump 1 tests completed")
        
    except Exception as e:
        logger.error(f"Error testing Pump 1: {e}")
        print(f"  ❌ Error testing Pump 1: {e}")
else:
    print("Pump 1 not connected")

Testing Pump 1:
  Software Version: 010700
  Rotation Speed: 0 RPM
  Electric Serial Number: 72163743        
  Pump 1 tests completed


In [11]:
# Test individual pump functions (similar to your debugging notebook)
if controller.pump2:
    try:
        print("Testing Pump 2:")
        # Add actual tests here similar to your debugging notebook
        sw_version = controller.pump2.get_swVers()
        print(f"  Software Version: {sw_version}")
        #
        # enable_status = controller.pump1.get_Pmp_Enable()
        # print(f"  Pump Enable: {enable_status}")
        #
        rotation_speed = controller.pump2.get_actRotSpd_rpm()
        print(f"  Rotation Speed: {rotation_speed} RPM")

        elecSN = controller.pump2.get_serial_nrr()
        print(f"  Electric Serial Number: {elecSN}")

        print("  Pump 2 tests completed")

    except Exception as e:
        logger.error(f"Error testing Pump 2: {e}")
        print(f"  ❌ Error testing Pump 2: {e}")
else:
    print("Pump 2 not connected")

Testing Pump 2:
  Software Version: 010700
  Rotation Speed: 000000 RPM
  Electric Serial Number: 72164481        
  Pump 2 tests completed


In [12]:
# Test pressure controller and sensors
if controller.pressure_controller:
    try:
        print("Testing Pressure Controller:")
        
        # Get firmware version
        fw_version = controller.pressure_controller.get_fw_vers()
        print(f"  Firmware Version: {fw_version}")
        
        # Test Sensor 1 (Channel 1)
        print("\n  Testing Sensor 1 (Channel 1):")
        sensor1_name = controller.pressure_controller.get_sens_name(1)
        print(f"    Sensor Name: {sensor1_name}")
        
        pressure1 = controller.pressure_controller.measure_p(1)
        print(f"    Pressure: {pressure1} (type: {type(pressure1).__name__})")
        
        # Test Sensor 2 (Channel 2) 
        print("\n  Testing Sensor 2 (Channel 2):")
        sensor2_name = controller.pressure_controller.get_sens_name(2)
        print(f"    Sensor Name: {sensor2_name}")
        
        pressure2 = controller.pressure_controller.measure_p(2)
        print(f"    Pressure: {pressure2} (type: {type(pressure2).__name__})")
        
        # Check if connection is still open
        if controller.pressure_controller.unit.isOpen():
            print("\n  ✅ Serial connection is open and working")
        
        print("\n  Pressure controller tests completed successfully")
        
    except Exception as e:
        logger.error(f"Error testing pressure controller: {e}")
        print(f"  ❌ Error testing pressure controller: {e}")
else:
    print("Pressure controller not connected")

Testing Pressure Controller:
  Firmware Version: 010400

  Testing Sensor 1 (Channel 1):
    Sensor Name: TPR   
    Pressure: 17.6 (type: float)

  Testing Sensor 2 (Channel 2):
    Sensor Name: TPR   
    Pressure: 1019.0 (type: float)

  ✅ Serial connection is open and working

  Pressure controller tests completed successfully


In [13]:
# Quick test: Read pressures individually (similar to your debugging notebook)
if controller.pressure_controller:
    try:
        print("Quick Pressure Test:")
        
        # Read pressure from sensor 1 (same as debugging notebook cell 11)
        p1 = controller.pressure_controller.measure_p(1)
        print(f"Sensor 1 pressure: {p1} (type: {type(p1).__name__})")
        
        # Read pressure from sensor 2 (same as debugging notebook cell 12)  
        p2 = controller.pressure_controller.measure_p(2)
        print(f"Sensor 2 pressure: {p2} (type: {type(p2).__name__})")
        
        print("✅ Pressure readings successful")
        
    except Exception as e:
        print(f"❌ Error reading pressures: {e}")
else:
    print("❌ Pressure controller not connected")

Quick Pressure Test:
Sensor 1 pressure: 17.64 (type: float)
Sensor 2 pressure: 1019.0 (type: float)
✅ Pressure readings successful


## 8. Monitoring Control Interface

In [14]:
# Create interactive controls for monitoring
start_button = widgets.Button(description="Start Monitoring", button_style='success')
stop_button = widgets.Button(description="Stop Monitoring", button_style='danger')
interval_slider = widgets.FloatSlider(
    value=5.0,
    min=1.0,
    max=15.0,
    step=1.0,
    description='Interval (s):',
    style={'description_width': 'initial'}
)
status_output = widgets.Output()

def on_start_clicked(b):
    with status_output:
        clear_output(wait=True)
        success = controller.start_monitoring(interval_slider.value)
        if success:
            print(f"✅ Monitoring started with {interval_slider.value}s interval")
        else:
            print("❌ Failed to start monitoring (already running?)")

def on_stop_clicked(b):
    with status_output:
        clear_output(wait=True)
        controller.stop_monitoring()
        print("🛑 Monitoring stopped")

start_button.on_click(on_start_clicked)
stop_button.on_click(on_stop_clicked)

# Display controls
controls = widgets.VBox([
    widgets.Label("Monitoring Controls:"),
    interval_slider,
    widgets.HBox([start_button, stop_button]),
    status_output
])

display(controls)

VBox(children=(Label(value='Monitoring Controls:'), FloatSlider(value=5.0, description='Interval (s):', max=15…

In [23]:
controller.pump1.enable_Pmp()
controller.pump2.enable_Pmp()


In [27]:
controller.pump2.enable_Pmp()

In [30]:
controller.pump1.disable_Pmp()

In [31]:
controller.pump2.disable_Pmp()

In [28]:
controller.pump1.disable_Pmp()
controller.pump2.disable_Pmp()

InvalidCharError: Cannot decode character. This issue may sometimes be resolved by ignoring invalid characters. Enable the filter globally by running the function `pfeiffer_vacuum_protocol.enable_valid_char_filter()` after the import statement.

## 9. Data Visualization and Analysis

# 9.1 Static Plot

In [15]:
def plot_monitoring_data():
    """Plot the collected monitoring data"""
    if not controller.monitoring_data:
        print("No monitoring data available. Start monitoring first.")
        return
    
    # Convert data to DataFrame for easier plotting
    timestamps = []
    pump1_rpm = []
    pump2_rpm = []
    sensor1_pressure = []
    sensor2_pressure = []
    
    for cycle in controller.monitoring_data:
        timestamps.append(cycle['cycle_timestamp'])
        
        # Extract pump data (placeholder values for now)
        pump1_rpm.append(cycle['pump1_data']['rotation_speed_rpm'] if cycle['pump1_data'] else None)
        pump2_rpm.append(cycle['pump2_data']['rotation_speed_rpm'] if cycle['pump2_data'] else None)
        
        # Extract actual pressure data (float values)
        sensor1_pressure.append(cycle['sensor1_data']['pressure_value'] if cycle['sensor1_data'] else None)
        sensor2_pressure.append(cycle['sensor2_data']['pressure_value'] if cycle['sensor2_data'] else None)
    
    # Create subplots
    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8))
    
    # Plot pump RPM (placeholder data for now)
    ax1.plot(timestamps, pump1_rpm, 'b-', label='Pump 1 RPM', marker='o')
    ax1.plot(timestamps, pump2_rpm, 'r-', label='Pump 2 RPM', marker='s')
    ax1.set_ylabel('Rotation Speed (RPM)')
    ax1.set_title('Pump Rotation Speed Over Time')
    ax1.legend()
    ax1.grid(True)
    
    # Plot pressure (actual float values)
    ax2.plot(timestamps, sensor1_pressure, 'g-', label='Sensor 1 Pressure', marker='^')
    ax2.plot(timestamps, sensor2_pressure, 'm-', label='Sensor 2 Pressure', marker='v')
    ax2.set_ylabel('Pressure')
    ax2.set_xlabel('Time')
    ax2.set_title('Pressure Over Time')
    ax2.legend()
    ax2.grid(True)
    
    # Use logarithmic scale for pressure if values are very small (typical for vacuum)
    if any(p and p > 0 for p in sensor1_pressure + sensor2_pressure if p is not None):
        ax2.set_yscale('log')
        ax2.set_ylabel('Pressure (log scale)')
    
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.show()
    
    # Print some statistics
    valid_p1 = [p for p in sensor1_pressure if p is not None]
    valid_p2 = [p for p in sensor2_pressure if p is not None]
    
    print(f"Plotted {len(controller.monitoring_data)} data points")
    if valid_p1:
        print(f"Sensor 1 pressure range: {min(valid_p1):.2e} to {max(valid_p1):.2e}")
    if valid_p2:
        print(f"Sensor 2 pressure range: {min(valid_p2):.2e} to {max(valid_p2):.2e}")

def get_monitoring_summary():
    """Get a summary of current monitoring status"""
    print(f"Monitoring Status: {'🟢 Active' if controller.monitoring_active else '🔴 Inactive'}")
    print(f"Data Points Collected: {len(controller.monitoring_data)}")
    print(f"Pump 1 Connected: {'✅' if controller.pump1 else '❌'}")
    print(f"Pump 2 Connected: {'✅' if controller.pump2 else '❌'}")
    print(f"Pressure Controller Connected: {'✅' if controller.pressure_controller else '❌'}")
    
    if controller.monitoring_data:
        latest = controller.monitoring_data[-1]
        print(f"\nLatest Data Point: {latest['cycle_timestamp']}")
        
        # Show latest pressure readings
        if latest['sensor1_data']:
            p1 = latest['sensor1_data'].get('pressure_value')
            print(f"  Sensor 1 Pressure: {p1}")
        
        if latest['sensor2_data']:
            p2 = latest['sensor2_data'].get('pressure_value') 
            print(f"  Sensor 2 Pressure: {p2}")

print("Data visualization functions ready")

Data visualization functions ready


In [None]:
# Plot monitoring data (run this cell periodically to see updated plots)
plot_monitoring_data()

# 9.2 Live Plotting

In [20]:
# AX1 to AX4
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
import numpy as np

class Plotter_locker:
    def __init__(self, controll, update_interval=1000):
        """
        Initialize live monitoring plot
        Args:
            controll: The controller object with monitoring_data
            update_interval: Update interval in milliseconds (default: 1000ms = 1 second)
        """
        self.controll = controll
        self.update_interval = update_interval
        self.window_duration = 60  # 60 seconds window

        # Initialize data lists
        self.timestamps = []
        self.pump1_rpm = []
        self.pump2_rpm = []
        self.sensor1_pressure = []
        self.sensor2_pressure = []
        self.pump1_power = []
        self.pump2_power = []
        self.pump1_temp_motor = []
        self.pump1_temp_pwrstg = []
        self.pump2_temp_motor = []
        self.pump2_temp_pwrstg = []

        # Create figure and subplots
        #self.fig, (self.ax1, self.ax2, self.ax3, self.ax4) = plt.subplots(4, 1, figsize=(12, 16))
        self.fig, ((self.ax1, self.ax2), (self.ax3, self.ax4)) = plt.subplots(2, 2, figsize=(16, 8))

        self.temp_log = []
        self.log_file_size = 0

        # Animation object
        self.animation = None
        self.rtn = self.controll.monitoring_data

    def update_plot(self, frame):
        # Get new data
        current_size = len(self.controll.monitoring_data)
        if current_size > self.log_file_size:
            new_entries = self.controll.monitoring_data[self.log_file_size:]
            self.temp_log.extend(new_entries)
            self.log_file_size = current_size

        # Process new entries
        for entry in self.temp_log:
            self.timestamps.append(entry['cycle_timestamp'])
            self.pump1_rpm.append(entry['pump1_data']['rotation_speed_rpm'] if entry['pump1_data'] else None)
            self.pump2_rpm.append(entry['pump2_data']['rotation_speed_rpm'] if entry['pump2_data'] else None)
            self.sensor1_pressure.append(entry['sensor1_data']['pressure_value'] if entry['sensor1_data'] else None)
            self.sensor2_pressure.append(entry['sensor2_data']['pressure_value'] if entry['sensor2_data'] else None)
            self.pump1_power.append(entry['pump1_data']['power_consumption'] if entry['pump1_data'] else None)
            self.pump2_power.append(entry['pump2_data']['power_consumption'] if entry['pump2_data'] else None)
            self.pump1_temp_motor.append(entry['pump1_data']['temp_Motor'] if entry['pump1_data'] else None)
            self.pump1_temp_pwrstg.append(entry['pump1_data']['temp_PwrStg'] if entry['pump1_data'] else None)
            self.pump2_temp_motor.append(entry['pump2_data']['temp_Motor'] if entry['pump2_data'] else None)
            self.pump2_temp_pwrstg.append(entry['pump2_data']['temp_PwrStg'] if entry['pump2_data'] else None)

        # Clear temp_log after processing to avoid reprocessing
        self.temp_log = []

        # Clear only the axes, not the entire figure
        self.ax1.clear()
        self.ax2.clear()
        self.ax3.clear()
        self.ax4.clear()

        # Filter out None values for plotting
        valid_data1 = [(t, p1, p2) for t, p1, p2 in zip(self.timestamps, self.pump1_rpm, self.pump2_rpm)
                      if p1 is not None or p2 is not None]
        valid_data2 = [(t, s1, s2) for t, s1, s2 in zip(self.timestamps, self.sensor1_pressure, self.sensor2_pressure)
                      if s1 is not None or s2 is not None]
        valid_data3 = [(t, pw1, pw2) for t, pw1, pw2 in zip(self.timestamps, self.pump1_power, self.pump2_power)
                      if pw1 is not None or pw2 is not None]
        valid_data4 = [(t, tm1, tp1, tm2, tp2) for t, tm1, tp1, tm2, tp2 in zip(self.timestamps, self.pump1_temp_motor,
                      self.pump1_temp_pwrstg, self.pump2_temp_motor, self.pump2_temp_pwrstg)
                      if tm1 is not None or tp1 is not None or tm2 is not None or tp2 is not None]

        # Plot RPM data (ax1)
        if valid_data1:
            times1, pump1_vals, pump2_vals = zip(*valid_data1)

            # Plot pump1 data (filter None values) - BLUE with circle marker
            pump1_valid = [(t, p) for t, p in zip(times1, pump1_vals) if p is not None]
            if pump1_valid:
                t1, p1 = zip(*pump1_valid)
                if len(t1) > 120:
                    self.ax1.plot(t1[-119:], p1[-119:], 'b-', label='Pump 1 RPM', marker='o')
                else:
                    self.ax1.plot(t1, p1, 'b-', label='Pump 1 RPM', marker='o')

            # Plot pump2 data (filter None values) - RED with square marker
            pump2_valid = [(t, p) for t, p in zip(times1, pump2_vals) if p is not None]
            if pump2_valid:
                t2, p2 = zip(*pump2_valid)
                if len(t2) > 120:
                    self.ax1.plot(t2[-119:], p2[-119:], 'r-', label='Pump 2 RPM', marker='s')
                else:
                    self.ax1.plot(t2, p2, 'r-', label='Pump 2 RPM', marker='s')

        # Plot pressure data (ax2)
        if valid_data2:
            times2, sensor1_vals, sensor2_vals = zip(*valid_data2)

            # Plot sensor1 data (filter None values) - BLUE with circle marker (matches pump1)
            sensor1_valid = [(t, s) for t, s in zip(times2, sensor1_vals) if s is not None]
            if sensor1_valid:
                t1, s1 = zip(*sensor1_valid)
                if len(t1) > 120:
                    self.ax2.plot(t1[-119:], s1[-119:], 'b-', label='Sensor 1 Pressure', marker='o')
                else:
                    self.ax2.plot(t1, s1, 'b-', label='Sensor 1 Pressure', marker='o')

            # Plot sensor2 data (filter None values) - RED with square marker (matches pump2)
            sensor2_valid = [(t, s) for t, s in zip(times2, sensor2_vals) if s is not None]
            if sensor2_valid:
                t2, s2 = zip(*sensor2_valid)
                if len(t2) > 120:
                    self.ax2.plot(t2[-119:], s2[-119:], 'r-', label='Sensor 2 Pressure', marker='s')
                else:
                    self.ax2.plot(t2, s2, 'r-', label='Sensor 2 Pressure', marker='s')

        # Plot power data (ax3)
        if valid_data3:
            times3, power1_vals, power2_vals = zip(*valid_data3)

            # Plot pump1 power data (filter None values) - BLUE with circle marker
            power1_valid = [(t, p) for t, p in zip(times3, power1_vals) if p is not None]
            if power1_valid:
                t1, p1 = zip(*power1_valid)
                if len(t1) > 120:
                    self.ax3.plot(t1[-119:], p1[-119:], 'b-', label='Pump 1 Power', marker='o')
                else:
                    self.ax3.plot(t1, p1, 'b-', label='Pump 1 Power', marker='o')

            # Plot pump2 power data (filter None values) - RED with square marker
            power2_valid = [(t, p) for t, p in zip(times3, power2_vals) if p is not None]
            if power2_valid:
                t2, p2 = zip(*power2_valid)
                if len(t2) > 120:
                    self.ax3.plot(t2[-119:], p2[-119:], 'r-', label='Pump 2 Power', marker='s')
                else:
                    self.ax3.plot(t2, p2, 'r-', label='Pump 2 Power', marker='s')

        # Plot temperature data (ax4)
        if valid_data4:
            times4, temp_motor1_vals, temp_pwrstg1_vals, temp_motor2_vals, temp_pwrstg2_vals = zip(*valid_data4)

            # Plot pump1 motor temperature data (filter None values) - BLUE with circle marker
            temp_motor1_valid = [(t, tm) for t, tm in zip(times4, temp_motor1_vals) if tm is not None]
            if temp_motor1_valid:
                t1, tm1 = zip(*temp_motor1_valid)
                if len(t1) > 120:
                    self.ax4.plot(t1[-119:], tm1[-119:], 'b-', label='Pump 1 Motor Temp', marker='o')
                else:
                    self.ax4.plot(t1, tm1, 'b-', label='Pump 1 Motor Temp', marker='o')

            # Plot pump1 power stage temperature data (filter None values) - CYAN with triangle marker
            temp_pwrstg1_valid = [(t, tp) for t, tp in zip(times4, temp_pwrstg1_vals) if tp is not None]
            if temp_pwrstg1_valid:
                t1, tp1 = zip(*temp_pwrstg1_valid)
                if len(t1) > 120:
                    self.ax4.plot(t1[-119:], tp1[-119:], 'c-', label='Pump 1 Power Stage Temp', marker='^')
                else:
                    self.ax4.plot(t1, tp1, 'c-', label='Pump 1 Power Stage Temp', marker='^')

            # Plot pump2 motor temperature data (filter None values) - RED with square marker
            temp_motor2_valid = [(t, tm) for t, tm in zip(times4, temp_motor2_vals) if tm is not None]
            if temp_motor2_valid:
                t2, tm2 = zip(*temp_motor2_valid)
                if len(t2) > 120:
                    self.ax4.plot(t2[-119:], tm2[-119:], 'r-', label='Pump 2 Motor Temp', marker='s')
                else:
                    self.ax4.plot(t2, tm2, 'r-', label='Pump 2 Motor Temp', marker='s')

            # Plot pump2 power stage temperature data (filter None values) - MAGENTA with inverted triangle marker
            temp_pwrstg2_valid = [(t, tp) for t, tp in zip(times4, temp_pwrstg2_vals) if tp is not None]
            if temp_pwrstg2_valid:
                t2, tp2 = zip(*temp_pwrstg2_valid)
                if len(t2) > 120:
                    self.ax4.plot(t2[-119:], tp2[-119:], 'm-', label='Pump 2 Power Stage Temp', marker='v')
                else:
                    self.ax4.plot(t2, tp2, 'm-', label='Pump 2 Power Stage Temp', marker='v')

        # Set labels and formatting
        self.ax1.set_ylabel('Rotation Speed (RPM)')
        self.ax1.set_title('Pump Rotation Speed Over Time')
        self.ax1.legend()
        self.ax1.grid(True)

        #self.ax2.set_ylabel('Pressure')
        # Use logarithmic scale for pressure if values are very small (typical for vacuum)
        self.ax2.set_yscale('log')
        self.ax2.set_ylabel('Pressure (log scale)')
        self.ax2.set_title('Pressure Over Time')
        self.ax2.legend()
        self.ax2.grid(True)

        self.ax3.set_ylabel('Power Consumption')
        self.ax3.set_title('Pump Power Over Time')
        self.ax3.legend()
        self.ax3.grid(True)

        self.ax4.set_ylabel('Temperature')
        self.ax4.set_xlabel('Time')
        self.ax4.set_title('Motor and Power Stage Temperature Over Time')
        self.ax4.legend()
        self.ax4.grid(True)

        # Rotate x-axis labels for better readability
        for ax in [self.ax1, self.ax2, self.ax3, self.ax4]:
            ax.tick_params(axis='x', rotation=45)

        # Adjust layout
        self.fig.tight_layout()

    def start_live_plot(self):
        """Start the live plotting"""
        if not self.controll.monitoring_data:
            print("No monitoring data available. Start monitoring first.")
            return None, None

        print(f"Starting live plot with {self.update_interval/1000}s update interval...")
        print("Close the plot window to stop live plotting.")

        # Create animation
        self.ani = FuncAnimation(
            self.fig,
            self.update_plot,
            interval=self.update_interval,
            blit=False,  # Set to False for easier debugging
            cache_frame_data=False
        )

        # Show the plot
        plt.show()

        return self.timestamps, self.rtn

    def stop_live_plot(self):
        """Stop the live plotting"""
        if hasattr(self, 'ani'):
            self.ani.event_source.stop()
            print("Live plotting stopped.")
        else:
            print("No animation to stop.")

In [42]:
# Only AX1 and AX2
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
import numpy as np

class Plotter_locker:
    def __init__(self, controll, update_interval=1000):
        """
        Initialize live monitoring plot
        Args:
            controll: The controller object with monitoring_data
            update_interval: Update interval in milliseconds (default: 1000ms = 1 second)
        """
        self.controll = controll
        self.update_interval = update_interval
        self.window_duration = 60  # 60 seconds window

        # Initialize data lists
        self.timestamps = []
        self.pump1_rpm = []
        self.pump2_rpm = []
        self.sensor1_pressure = []
        self.sensor2_pressure = []

        # Create figure and subplots
        self.fig, (self.ax1, self.ax2) = plt.subplots(2, 1, figsize=(12, 8))

        self.temp_log = []
        self.log_file_size = 0

        # Animation object
        self.animation = None
        self.rtn = self.controll.monitoring_data

    def update_plot(self, frame):
        # Get new data
        current_size = len(self.controll.monitoring_data)
        if current_size > self.log_file_size:
            new_entries = self.controll.monitoring_data[self.log_file_size:]
            self.temp_log.extend(new_entries)
            self.log_file_size = current_size

        # Process new entries
        for entry in self.temp_log:
            self.timestamps.append(entry['cycle_timestamp'])
            self.pump1_rpm.append(entry['pump1_data']['rotation_speed_rpm'] if entry['pump1_data'] else None)
            self.pump2_rpm.append(entry['pump2_data']['rotation_speed_rpm'] if entry['pump2_data'] else None)
            self.sensor1_pressure.append(entry['sensor1_data']['pressure_value'] if entry['sensor1_data'] else None)
            self.sensor2_pressure.append(entry['sensor2_data']['pressure_value'] if entry['sensor2_data'] else None)

        # Clear temp_log after processing to avoid reprocessing
        self.temp_log = []

        # Clear only the axes, not the entire figure
        self.ax1.clear()
        self.ax2.clear()

        # Filter out None values for plotting
        valid_data1 = [(t, p1, p2) for t, p1, p2 in zip(self.timestamps, self.pump1_rpm, self.pump2_rpm)
                      if p1 is not None or p2 is not None]
        valid_data2 = [(t, s1, s2) for t, s1, s2 in zip(self.timestamps, self.sensor1_pressure, self.sensor2_pressure)
                      if s1 is not None or s2 is not None]

        if valid_data1:
            times1, pump1_vals, pump2_vals = zip(*valid_data1)

            # Plot pump1 data (filter None values)
            pump1_valid = [(t, p) for t, p in zip(times1, pump1_vals) if p is not None]
            if pump1_valid:
                t1, p1 = zip(*pump1_valid)
                if len(t1) > 120:
                    self.ax1.plot(t1[-119:], p1[-119:], 'b-', label='Pump 1 RPM', marker='o')
                else:
                    self.ax1.plot(t1, p1, 'b-', label='Pump 1 RPM', marker='o')

            # Plot pump2 data (filter None values)
            pump2_valid = [(t, p) for t, p in zip(times1, pump2_vals) if p is not None]
            if pump2_valid:
                t2, p2 = zip(*pump2_valid)
                if len(t2) > 120:
                    self.ax1.plot(t2[-119:], p2[-119:], 'r-', label='Pump 2 RPM', marker='s')
                else:
                    self.ax1.plot(t2, p2, 'r-', label='Pump 2 RPM', marker='s')


        if valid_data2:
            times2, sensor1_vals, sensor2_vals = zip(*valid_data2)

            # Plot sensor1 data (filter None values)
            sensor1_valid = [(t, s) for t, s in zip(times2, sensor1_vals) if s is not None]
            if sensor1_valid:
                t1, s1 = zip(*sensor1_valid)
                if len(t1) > 120:
                    self.ax2.plot(t1[-119:], s1[-119:], 'g-', label='Sensor 1 Pressure', marker='^')
                else:
                    self.ax2.plot(t1, s1, 'g-', label='Sensor 1 Pressure', marker='^')

            # Plot sensor2 data (filter None values)
            sensor2_valid = [(t, s) for t, s in zip(times2, sensor2_vals) if s is not None]
            if sensor2_valid:
                t2, s2 = zip(*sensor2_valid)
                if len(t2) > 120:
                    self.ax2.plot(t2[-119:], s2[-119:], 'm-', label='Sensor 2 Pressure', marker='v')
                else:
                    self.ax2.plot(t2, s2, 'm-', label='Sensor 2 Pressure', marker='v')

        # Set labels and formatting
        self.ax1.set_ylabel('Rotation Speed (RPM)')
        self.ax1.set_title('Pump Rotation Speed Over Time')
        self.ax1.legend()
        self.ax1.grid(True)

        self.ax2.set_ylabel('Pressure')
        self.ax2.set_xlabel('Time')
        self.ax2.set_title('Pressure Over Time')
        self.ax2.legend()
        self.ax2.grid(True)

        # Rotate x-axis labels for better readability
        for ax in [self.ax1, self.ax2]:
            ax.tick_params(axis='x', rotation=45)

        # Adjust layout
        self.fig.tight_layout()

    def start_live_plot(self):
        """Start the live plotting"""
        if not self.controll.monitoring_data:
            print("No monitoring data available. Start monitoring first.")
            return None, None

        print(f"Starting live plot with {self.update_interval/1000}s update interval...")
        print("Close the plot window to stop live plotting.")

        # Create animation
        self.ani = FuncAnimation(
            self.fig,
            self.update_plot,
            interval=self.update_interval,
            blit=False,  # Set to False for easier debugging
            cache_frame_data=False
        )

        # Show the plot
        plt.show()

        return self.timestamps, self.rtn

    def stop_live_plot(self):
        """Stop the live plotting"""
        if hasattr(self, 'ani'):
            self.ani.event_source.stop()
            print("Live plotting stopped.")
        else:
            print("No animation to stop.")

In [18]:
%matplotlib notebook

In [34]:
live_plot = Plotter_locker(controller)
timestmps, rtn = live_plot.start_live_plot()

<IPython.core.display.Javascript object>

Starting live plot with 1.0s update interval...
Close the plot window to stop live plotting.


In [32]:
live_plot.stop_live_plot()

Live plotting stopped.


## 10. Data Export and Cleanup

In [39]:
def export_monitoring_data(filename: str = None):
    """Export monitoring data to CSV file"""
    if not controller.monitoring_data:
        print("No data to export")
        return
    
    if filename is None:
        filename = f"pump_monitoring_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
    
    # Flatten the data for CSV export
    flattened_data = []
    for cycle in controller.monitoring_data:
        row = {'timestamp': cycle['cycle_timestamp']}
        
        # Add pump 1 data
        if cycle['pump1_data']:
            for key, value in cycle['pump1_data'].items():
                row[f'pump1_{key}'] = value
        
        # Add pump 2 data
        if cycle['pump2_data']:
            for key, value in cycle['pump2_data'].items():
                row[f'pump2_{key}'] = value
        
        # Add sensor 1 data
        if cycle['sensor1_data']:
            for key, value in cycle['sensor1_data'].items():
                row[f'sensor1_{key}'] = value
        
        # Add sensor 2 data
        if cycle['sensor2_data']:
            for key, value in cycle['sensor2_data'].items():
                row[f'sensor2_{key}'] = value
        
        flattened_data.append(row)
    
    df = pd.DataFrame(flattened_data)
    df.to_csv(filename, index=False)
    logger.info(f"Data exported to {filename}")
    print(f"✅ Data exported to {filename}")

print("Export function ready")

Export function ready


In [40]:
export_monitoring_data("14-07-2025_002_dataexport")

2025-07-14 13:41:45 | INFO | Data exported to 14-07-2025_002_dataexport


✅ Data exported to 14-07-2025_002_dataexport


In [None]:
#Save list as list
-
with open('14-07-2025_002', 'wb') as f:
    pickle.dump(controller.monitoring_data, f)

In [None]:
# Load controller.monitor_data
with open('mylist', 'rb') as f:
    mylist = pickle.load(f)

In [41]:
controller.disconnect_all()

2025-07-14 13:42:05 | INFO | Disconnecting all devices...
2025-07-14 13:42:05 | INFO | Stopping continuous monitoring...
2025-07-14 13:42:05 | SUCCESS | Monitoring thread stopped successfully
2025-07-14 13:42:05 | INFO | Pump 1 disconnected
2025-07-14 13:42:05 | INFO | Pump 2 disconnected
2025-07-14 13:42:05 | INFO | Pressure controller disconnected
2025-07-14 13:42:05 | SUCCESS | All devices disconnected


In [None]:
# Emergency stop and cleanup
# Run this cell if you need to stop everything and disconnect all devices

print("🛑 Emergency Stop - Disconnecting all devices...")
controller.disconnect_all()
print("✅ All devices disconnected safely")
print("\nTo reconnect, run the connection cells again.")

## Quick Reference

### Key Functions:
- **Connect devices**: Run cells in section 6
- **Start monitoring**: Use the interface in section 8 or call `controller.start_monitoring(interval)`
- **Stop monitoring**: Use the interface in section 8 or call `controller.stop_monitoring()`
- **View data**: Run `get_monitoring_summary()` and `plot_monitoring_data()`
- **Export data**: Run `export_monitoring_data()`
- **Emergency stop**: Run the last cell to disconnect everything

### Important Notes:
- Monitoring runs in a separate thread and can be cancelled without losing device connections
- All actions are logged to files in the `logs/` directory
- The monitoring functions are currently placeholder implementations - replace with actual device methods
- Adjust COM ports and addresses in section 6 according to your setup