In [1]:
import tkinter as tk # Imports the standard GUI library, aliased as tk
from tkinter import filedialog, scrolledtext, messagebox # Imports specific useful sub-modules
import pandas as pd # Imports the data analysis library for handling CSV files
import numpy as np # Imports the numerical library (though not heavily used here)
import socket # Imports the networking module for UDP communication
import threading # Imports the module for running code in the background (concurrently)
import struct # Imports the module for converting Python values to C structs (for UDP data)
import time # Imports the module for time-related tasks (like logging timestamps)
import random # Imports the module for generating random numbers (for PV deviation)
import math # For the sender simulation, not strictly needed in the GUI

In [2]:
# --- Configuration (Constants used throughout the code) ---
UDP_IP = "127.0.0.1"    # Localhost IP address for testing on the same machine
UDP_PORT = 5005         # The port number the listener will bind to
EXPECTED_DATA_LEN = 2 * 8 # We expect 2 'double' precision floats, each is 8 bytes
MAX_DEVIATION = 0.10      # Max percentage deviation for Real_PV (10%)

In [3]:
# --- Main Application Class (Object-Oriented Programming) ---
class GridSimulatorApp:
    # 1. Constructor Method: Called automatically when you create a new GridSimulatorApp object
    def __init__(self, master):
        # 'master' is the root Tkinter window (the main application window)
        self.master = master 
        
        # Sets the title of the main window
        master.title("ARIES Emulator") 
        
        # --- Data Storage and Control Variables ---
        self.df = None                 # Holds the pandas DataFrame loaded from the CSV
        self.current_minute = 0        # Tracks the current simulation minute/row index
        self.udp_thread = None         # Holds the background thread for the UDP listener
        self.running = threading.Event() # A flag used to safely start/stop the UDP thread
        self.last_grid_data = "Waiting for CSV load and UDP controls."
        self.log_data = []             # List to store dictionaries of logged simulation results
        
        # --- GUI Setup (Tkinter Layout) ---
        master.grid_columnconfigure(0, weight=1) 
        master.grid_rowconfigure(0, weight=0) 
        master.grid_rowconfigure(1, weight=0) 
        master.grid_rowconfigure(2, weight=0) 
        master.grid_rowconfigure(3, weight=1) 

        # --- Section 0: Dynamic Diagram Display (Canvas) ---
        self.diagram_frame = tk.LabelFrame(master, text="Power Flow Visualization (Grid is on the left)", padx=5, pady=5)
        self.diagram_frame.grid(row=0, column=0, padx=10, pady=5, sticky="ew")
        
        # Canvas: Used for drawing custom shapes, lines, and text.
        self.canvas = tk.Canvas(self.diagram_frame, width=780, height=200, bg="white", bd=2, relief="groove")
        self.canvas.pack(fill="both", expand=True)
        
        # Dictionary to store element IDs for dynamic updates (like text and arrows)
        self.diagram_elements = {}
        self.setup_diagram_canvas() # Calls the method to draw the static diagram elements

        # --- Section 1: CSV Controls (Input) ---
        csv_frame = tk.LabelFrame(master, text="1. Load Input Data (Required: time, forecast, load)", padx=10, pady=10)
        csv_frame.grid(row=1, column=0, padx=10, pady=5, sticky="ew")
        
        self.select_button = tk.Button(csv_frame, text="Select CSV File", command=self.load_csv_file)
        self.select_button.pack(side="left", padx=5)
        
        self.csv_status_label = tk.Label(csv_frame, text="Load CSV to start.", fg="blue")
        self.csv_status_label.pack(side="left", padx=10)
        
        # --- Section 2: UDP Controls & Logging ---
        control_frame = tk.LabelFrame(master, text="2. Control & Output", padx=10, pady=10)
        control_frame.grid(row=2, column=0, padx=10, pady=5, sticky="ew")
        
        self.udp_button = tk.Button(control_frame, text="Start UDP Listener", command=self.toggle_udp_listener, state=tk.DISABLED)
        self.udp_button.pack(side="left", padx=5)
        
        self.log_button = tk.Button(control_frame, text="Save Simulation Log", command=self.save_log_to_csv, state=tk.DISABLED)
        self.log_button.pack(side="right", padx=5)
        
        self.udp_status_label = tk.Label(control_frame, text="UDP Listener stopped.", fg="red")
        self.udp_status_label.pack(side="left", padx=10)
        
        # --- Section 3: Data Display (ScrolledText) ---
        self.data_display = scrolledtext.ScrolledText(master, wrap=tk.WORD, width=80, height=8) 
        self.data_display.grid(row=3, column=0, padx=10, pady=10, sticky="nsew")
        self.data_display.insert(tk.END, "Status: Waiting for CSV file...")
        self.data_display.config(state=tk.DISABLED) 

        master.protocol("WM_DELETE_WINDOW", self.on_closing)

    # 2. Method to Draw the Static Diagram on the Canvas
    def setup_diagram_canvas(self):
        """Draws the static elements (boxes, bus line) of the power flow diagram."""
        canvas = self.canvas
        self.diagram_elements = {} 

        # --- Define Node Layout ---
        NODE_WIDTH, NODE_HEIGHT = 100, 40
        X_GRID, X_COMPONENT = 100, 400
        Y_PV, Y_BATT, Y_LOAD = 30, 100, 170
        CENTER_X = (X_GRID + X_COMPONENT) / 2
        
        # Grid Power Box (Left)
        grid_coords = (X_GRID, Y_BATT - NODE_HEIGHT/2, X_GRID + NODE_WIDTH, Y_BATT + NODE_HEIGHT/2)
        canvas.create_rectangle(grid_coords, fill='lightblue', outline='black', width=2)
        canvas.create_text(X_GRID + NODE_WIDTH/2, Y_BATT - 10, text="GRID POWER", font=('Arial', 10, 'bold'))
        self.diagram_elements['grid_val_text'] = canvas.create_text(X_GRID + NODE_WIDTH/2, Y_BATT + 10, text="---", font=('Arial', 9, 'bold'))
        
        # PV System Box (Top Right)
        pv_coords = (X_COMPONENT, Y_PV - NODE_HEIGHT/2, X_COMPONENT + NODE_WIDTH, Y_PV + NODE_HEIGHT/2)
        canvas.create_rectangle(pv_coords, fill='lightgreen', outline='black')
        canvas.create_text(X_COMPONENT + NODE_WIDTH/2, Y_PV - 10, text="PV OUTPUT", font=('Arial', 9, 'bold'))
        self.diagram_elements['pv_val_text'] = canvas.create_text(X_COMPONENT + NODE_WIDTH/2, Y_PV + 10, text="---", font=('Arial', 8))
        
        # Battery Box (Middle Right)
        batt_coords = (X_COMPONENT, Y_BATT - NODE_HEIGHT/2, X_COMPONENT + NODE_WIDTH, Y_BATT + NODE_HEIGHT/2)
        canvas.create_rectangle(batt_coords, fill='yellow', outline='black')
        canvas.create_text(X_COMPONENT + NODE_WIDTH/2, Y_BATT - 10, text="BATTERY (B_set)", font=('Arial', 9, 'bold'))
        self.diagram_elements['batt_val_text'] = canvas.create_text(X_COMPONENT + NODE_WIDTH/2, Y_BATT + 10, text="---", font=('Arial', 8))

        # Load Box (Bottom Right)
        load_coords = (X_COMPONENT, Y_LOAD - NODE_HEIGHT/2, X_COMPONENT + NODE_WIDTH, Y_LOAD + NODE_HEIGHT/2)
        canvas.create_rectangle(load_coords, fill='lightgray', outline='black')
        canvas.create_text(X_COMPONENT + NODE_WIDTH/2, Y_LOAD - 10, text="LOAD", font=('Arial', 9, 'bold'))
        self.diagram_elements['load_val_text'] = canvas.create_text(X_COMPONENT + NODE_WIDTH/2, Y_LOAD + 10, text="---", font=('Arial', 8))

        # --- Central Power Bus ---
        canvas.create_line(CENTER_X, 0, CENTER_X, 200, fill='gray', dash=(4, 2))
        
        # --- Dynamic Arrows (Only the dynamic element, replacing static lines) ---
        
        # Grid Arrow (Horizontal from Grid to Bus)
        self.diagram_elements['grid_arrow'] = canvas.create_line(grid_coords[2], Y_BATT, CENTER_X - 10, Y_BATT, arrow=tk.NONE, width=3, fill='gray')
        
        # PV Arrow (REVERSED: Positive value means arrow ENTERS PV box, points RIGHT/LAST) - KEEPING LOGIC SAME
        self.diagram_elements['pv_arrow'] = canvas.create_line(pv_coords[0], Y_PV, CENTER_X + 10, Y_PV, arrow=tk.LAST, width=3, fill='gray')
        
        # Battery Arrow (Dynamic direction, REVERSED logic) - KEEPING LOGIC SAME
        self.diagram_elements['batt_arrow'] = canvas.create_line(batt_coords[0], Y_BATT, CENTER_X + 10, Y_BATT, arrow=tk.NONE, width=3, fill='gray')

        # Load Arrow (REVERSED: Load positive means arrow LEAVES Load box, points LEFT/FIRST)
        self.diagram_elements['load_arrow'] = canvas.create_line(load_coords[0], Y_LOAD, CENTER_X + 10, Y_LOAD, arrow=tk.FIRST, width=3, fill='gray')


    # 3. Method to Update the Diagram's Values and Arrows
    def update_diagram_values(self, result):
        """Updates the numerical values and arrow directions/colors on the canvas based on user requirements."""
        canvas = self.canvas
        
        # Calculate the actual PV power injected (Real_PV * Pset)
        pv_output = result['Real_PV'] #* result['Pset_UDP'] 
        
        # --- Update Text Values ---
        canvas.itemconfig(self.diagram_elements['grid_val_text'], text=f"{result['Grid_Power']:.2f}kW")
        canvas.itemconfig(self.diagram_elements['pv_val_text'], text=f"{pv_output:.2f}kW\nPset: {result['Pset_UDP']:.2f}")
        canvas.itemconfig(self.diagram_elements['batt_val_text'], text=f"{result['B_set_UDP']:.2f}kW")
        canvas.itemconfig(self.diagram_elements['load_val_text'], text=f"{result['load']:.2f}kW")

        # --- Arrow Direction and Color Logic ---
        
        # 1. Grid Power Arrow (Dynamic Direction and Color) - KEEPING LOGIC SAME
        grid_val = result['Grid_Power']
        if grid_val > 0.01: # Exporting (Grid Positive): Arrow leaves Grid box (points Right/Last), Color RED
            grid_arrow = tk.LAST
            grid_color = 'red'
        elif grid_val < -0.01: # Importing (Grid Negative): Arrow enters Grid box (points Left/First), Color GREEN
            grid_arrow = tk.FIRST
            grid_color = 'green'
        else: # Near Zero
            grid_arrow = tk.NONE
            grid_color = 'gray'
        canvas.itemconfig(self.diagram_elements['grid_arrow'], arrow=grid_arrow, fill=grid_color)

        # 2. PV Arrow (Fixed Direction: Right, into the box) - KEEPING LOGIC SAME
        pv_val = result['Real_PV'] #* result['Pset_UDP']
        if pv_val > 0.01:
            pv_color = 'red'
        else:
            pv_color = 'gray' 
        # Arrow direction is fixed as tk.LAST (Right) in setup_diagram_canvas
        canvas.itemconfig(self.diagram_elements['pv_arrow'], fill=pv_color)

        # 3. Battery Arrow (Dynamic Direction) - KEEPING LOGIC SAME
        batt_val = result['B_set_UDP']
        if batt_val > 0.01: 
            # DISCHARGING (Positive): Arrow ENTERS Battery box (points RIGHT/LAST)
            batt_arrow = tk.LAST
            batt_color = 'red'
        elif batt_val < -0.01: 
            # CHARGING (Negative): Arrow LEAVES Battery box (points LEFT/FIRST)
            batt_arrow = tk.FIRST
            batt_color = 'red'
        else:
            batt_arrow = tk.NONE
            batt_color = 'gray'
        canvas.itemconfig(self.diagram_elements['batt_arrow'], arrow=batt_arrow, fill=batt_color)
        
        # 4. Load Arrow (Fixed Direction: Left, away from the box) - REVERSED
        load_val = result['load']
        if load_val > 0.01:
            load_color = 'red'
        else:
            load_color = 'gray'
        # Arrow direction is fixed as tk.FIRST (Left) in setup_diagram_canvas
        canvas.itemconfig(self.diagram_elements['load_arrow'], fill=load_color)
        
        # Ensure arrows are on top
        for element_id in self.diagram_elements.values():
            if isinstance(element_id, int):
                canvas.tag_raise(element_id)


    # 4. Method to load the CSV data
    def load_csv_file(self):
        """Reads CSV data and initializes the simulation."""
        filepath = filedialog.askopenfilename(defaultextension=".csv", filetypes=[("CSV files", "*.csv")])
        if not filepath:
            self.csv_status_label.config(text="File selection cancelled.", fg="red")
            return
        
        try:
            df_temp = pd.read_csv(filepath)
            required_columns = ['time', 'forecast', 'load'] 
            
            if not all(col in df_temp.columns for col in required_columns):
                raise ValueError(f"CSV missing required columns: {required_columns}.")

            self.df = df_temp[required_columns].copy() 
            self.current_minute = 0
            self.log_data = [] 
            self.udp_button.config(state=tk.NORMAL) 
            
            self.update_display("System Initialized. Use the diagram above for real-time power flow. Log below shows detailed outputs.")

            self.csv_status_label.config(text=f"Loaded {len(self.df)} rows. CSV ready.", fg="green")

        except Exception as e:
            messagebox.showerror("Error", f"Failed to process CSV: {str(e)}")
            self.csv_status_label.config(text="Error loading file.", fg="red")
            self.df = None

    # 5. Core Calculation Method
    def calculate_grid_power(self, Pset, B_set):
        """
        Calculates Grid Power using the formula and logs the result.
        Equation: Grid = Load - (Real_PV * Pset) - B_set
        """
        if self.df is None:
            return None

        sim_index = self.current_minute % len(self.df)
        data_row = self.df.iloc[sim_index]
        
        load_csv = data_row['load']
        forecast_csv = data_row['forecast']
        
        # 2. Calculate Real_PV (Actual PV output with a small, random deviation)
        deviation_factor = random.uniform(-MAX_DEVIATION, MAX_DEVIATION)
        real_pv = forecast_csv * (1 + deviation_factor)
        
        # 3. Calculate Grid Power (The core simulation output)
#         grid_power = load_csv - (real_pv * Pset) - B_set
        grid_power = load_csv - real_pv - B_set

        # --- LOGGING STEP ---
        result = {
            'time_minute': data_row['time'],
            'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
            'load': load_csv,
            'forecast': forecast_csv,
            'Pset_UDP': Pset,
            'B_set_UDP': B_set,
            'Real_PV': real_pv,
            'Deviation_Factor': deviation_factor,
            'Grid_Power': grid_power
        }
        self.log_data.append(result) 
        # --------------------
        
        self.current_minute += 1 
        
        return result

    # 6. Method to Save the Simulation Log
    def save_log_to_csv(self):
        """Saves the contents of self.log_data to a user-specified CSV file."""
        if not self.log_data:
            messagebox.showwarning("Warning", "No simulation data has been generated yet.")
            return

        filepath = filedialog.asksaveasfilename(
            defaultextension=".csv",
            initialfile="grid_simulation_log.csv",
            filetypes=[("CSV files", "*.csv"), ("All files", "*.*")]
        )
        
        if not filepath:
            return 

        try:
            log_df = pd.DataFrame(self.log_data) 
            log_df.to_csv(filepath, index=False, float_format='%.6f') 
            
            messagebox.showinfo("Success", f"Successfully saved {len(self.log_data)} logged data points.")
            self.log_button.config(text=f"Save Log ({len(self.log_data)} points)")
            
        except Exception as e:
            messagebox.showerror("Error", f"Failed to save log file: {str(e)}")


    # 7. Method to Control the Listener (Start/Stop)
    def toggle_udp_listener(self):
        """Starts or stops the UDP listener thread."""
        if self.running.is_set():
            # Code to STOP the listener
            self.running.clear() 
            try:
                stop_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
                stop_sock.sendto(b'STOP', (UDP_IP, UDP_PORT))
                stop_sock.close()
            except Exception: pass
            
            self.udp_button.config(text="Start UDP Listener")
            self.udp_status_label.config(text="UDP Listener stopped.", fg="red")
            self.log_button.config(state=tk.NORMAL, text=f"Save Log ({len(self.log_data)} points)")
            if self.udp_thread and self.udp_thread.is_alive():
                self.udp_thread.join(timeout=1) 

        else:
            # Code to START the listener
            if self.df is None:
                messagebox.showerror("Error", "Please load the CSV file first.")
                return

            self.running.set() 
            self.udp_thread = threading.Thread(target=self.udp_listener, daemon=True)
            self.udp_thread.start() 
            self.udp_button.config(text="Stop UDP Listener")
            self.udp_status_label.config(text=f"Listening on {UDP_PORT}...", fg="green")
            self.log_button.config(state=tk.DISABLED)
            messagebox.showinfo("Log Cleared", "The log has been cleared for a new run.")
            self.log_data = [] 


    # 8. Background Thread Function
    def udp_listener(self):
        """The main function for the background thread to receive UDP data."""
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 
            sock.bind((UDP_IP, UDP_PORT)) 
            sock.settimeout(0.5) 
            
            while self.running.is_set(): 
                try:
                    data, addr = sock.recvfrom(1024) 
                    
                    if data == b'STOP': break 
                    
                    if len(data) == EXPECTED_DATA_LEN and self.df is not None:
                        Pset, B_set = struct.unpack('>dd', data) 
                        
                        result = self.calculate_grid_power(Pset, B_set) 
                        
                        self.master.after(0, self.update_diagram_values, result) 
                        
                        deviation_percent = result['Deviation_Factor'] * 100
                        sign = "+" if deviation_percent >= 0 else ""
                        
                        self.last_grid_data = (
                            f"Simulation Time: Minute {result['time_minute']:.0f} (Total Runs: {len(self.log_data)})\n"
                            f"="*40 + "\n"
                            f"  Load: {result['load']:.4f}\n"
                            f"  Forecast: {result['forecast']:.4f}\n"
                            f"  Pset: {result['Pset_UDP']:.4f}\n"
                            f"  B_set: {result['B_set_UDP']:.4f}\n"
                            f"  Real_PV: {result['Real_PV']:.4f} ({sign}{deviation_percent:.2f}% Deviation)\n"
                            f"  Grid_Power: {result['Grid_Power']:.4f}\n"
                        )
                        
                        self.master.after(0, self.update_display, "New Detailed Log Entry:\n")
                        
                    elif self.df is None:
                        self.master.after(0, self.update_display, "Error: Cannot calculate. CSV data not loaded.")
                    
                except socket.timeout:
                    continue 
                except Exception as e:
                    self.master.after(0, messagebox.showerror, "Processing Error", f"Failed data step: {e}")
                
        except Exception as e:
            self.master.after(0, messagebox.showerror, "UDP Error", f"Listener failed: {e}")
        finally:
            if 'sock' in locals(): sock.close() 
            self.master.after(0, self.udp_button.config, {'text': "Start UDP Listener"})
            self.master.after(0, self.udp_status_label.config, {'text': "UDP Listener stopped.", 'fg': "red"})
            self.master.after(0, self.log_button.config, {'state': tk.NORMAL, 'text': f"Save Log ({len(self.log_data)} points)"})


    # 9. Method to Update the Log Display
    def update_display(self, udp_header):
        """Safely updates the ScrolledText widget with detailed log entries."""
        self.data_display.config(state=tk.NORMAL) 
        self.data_display.insert(tk.END, udp_header) 
        self.data_display.insert(tk.END, self.last_grid_data) 
        self.data_display.insert(tk.END, "\n" + "="*40 + "\n\n") 
        self.data_display.config(state=tk.DISABLED) 
        self.data_display.see(tk.END) 


    # 10. Clean-up Method
    def on_closing(self):
        """Stops the UDP listener thread cleanly before closing the window."""
        if self.running.is_set():
            self.toggle_udp_listener() 
        self.master.destroy() 

# --- Main application execution ---
if __name__ == "__main__":
    root = tk.Tk()
    app = GridSimulatorApp(root)
    root.mainloop()