diff --git a/concore.py b/concore.py index ce3ab0a..6d71f0f 100644 --- a/concore.py +++ b/concore.py @@ -5,26 +5,35 @@ import re import zmq # Added for ZeroMQ -#if windows, create script to kill this process +# if windows, create script to kill this process # because batch files don't provide easy way to know pid of last command -# ignored for posix!=windows, because "concorepid" is handled by script -# ignored for docker (linux!=windows), because handled by docker stop +# ignored for posix != windows, because "concorepid" is handled by script +# ignored for docker (linux != windows), because handled by docker stop if hasattr(sys, 'getwindowsversion'): with open("concorekill.bat","w") as fpid: fpid.write("taskkill /F /PID "+str(os.getpid())+"\n") -# --- ZeroMQ Integration Start --- +# =================================================================== +# ZeroMQ Communication Wrapper +# =================================================================== class ZeroMQPort: def __init__(self, port_type, address, zmq_socket_type): + """ + port_type: "bind" or "connect" + address: ZeroMQ address (e.g., "tcp://*:5555") + zmq_socket_type: zmq.REQ, zmq.REP, zmq.PUB, zmq.SUB etc. + """ self.context = zmq.Context() self.socket = self.context.socket(zmq_socket_type) self.port_type = port_type # "bind" or "connect" self.address = address - self.socket.setsockopt(zmq.RCVTIMEO, 2000) - self.socket.setsockopt(zmq.SNDTIMEO, 2000) - self.socket.setsockopt(zmq.LINGER, 0) + # Configure timeouts & immediate close on failure + self.socket.setsockopt(zmq.RCVTIMEO, 2000) # 2 sec receive timeout + self.socket.setsockopt(zmq.SNDTIMEO, 2000) # 2 sec send timeout + self.socket.setsockopt(zmq.LINGER, 0) # Drop pending messages on close + # Bind or connect if self.port_type == "bind": self.socket.bind(address) print(f"ZMQ Port bound to {address}") @@ -33,6 +42,7 @@ def __init__(self, port_type, address, zmq_socket_type): print(f"ZMQ Port connected to {address}") def send_json_with_retry(self, message): + """Send JSON message with retries if timeout occurs.""" for attempt in range(5): try: self.socket.send_json(message) @@ -44,6 +54,7 @@ def send_json_with_retry(self, message): return def recv_json_with_retry(self): + """Receive JSON message with retries if timeout occurs.""" for attempt in range(5): try: return self.socket.recv_json() @@ -89,6 +100,9 @@ def terminate_zmq(): print(f"Error while terminating ZMQ port {port.address}: {e}") # --- ZeroMQ Integration End --- +# =================================================================== +# File & Parameter Handling +# =================================================================== def safe_literal_eval(filename, defaultValue): try: with open(filename, "r") as file: @@ -97,10 +111,13 @@ def safe_literal_eval(filename, defaultValue): # Keep print for debugging, but can be made quieter # print(f"Info: Error reading {filename} or file not found, using default: {e}") return defaultValue - + + +# Load input/output ports if present iport = safe_literal_eval("concore.iport", {}) oport = safe_literal_eval("concore.oport", {}) +# Global variables s = '' olds = '' delay = 1 @@ -110,14 +127,20 @@ def safe_literal_eval(filename, defaultValue): simtime = 0 #9/21/22 +# =================================================================== +# Parameter Parsing +# =================================================================== try: sparams_path = os.path.join(inpath + "1", "concore.params") if os.path.exists(sparams_path): with open(sparams_path, "r") as f: sparams = f.read() if sparams: # Ensure sparams is not empty + # Windows sometimes keeps quotes if sparams[0] == '"' and sparams[-1] == '"': #windows keeps "" need to remove sparams = sparams[1:-1] + + # Convert key=value;key2=value2 to Python dict format if sparams != '{' and not (sparams.startswith('{') and sparams.endswith('}')): # Check if it needs conversion print("converting sparams: "+sparams) sparams = "{'"+re.sub(';',",'",re.sub('=',"':",re.sub(' ','',sparams)))+"}" @@ -137,11 +160,16 @@ def safe_literal_eval(filename, defaultValue): #9/30/22 def tryparam(n, i): + """Return parameter `n` from params dict, else default `i`.""" return params.get(n, i) #9/12/21 +# =================================================================== +# Simulation Time Handling +# =================================================================== def default_maxtime(default): + """Read maximum simulation time from file or use default.""" global maxtime maxtime_path = os.path.join(inpath + "1", "concore.maxtime") maxtime = safe_literal_eval(maxtime_path, default) @@ -149,6 +177,7 @@ def default_maxtime(default): default_maxtime(100) def unchanged(): + """Check if global string `s` is unchanged since last call.""" global olds, s if olds == s: s = '' @@ -156,16 +185,21 @@ def unchanged(): olds = s return False +# =================================================================== +# I/O Handling (File + ZMQ) +# =================================================================== def read(port_identifier, name, initstr_val): global s, simtime, retrycount + # Default return default_return_val = initstr_val if isinstance(initstr_val, str): try: default_return_val = literal_eval(initstr_val) except (SyntaxError, ValueError): pass - + + # Case 1: ZMQ port if isinstance(port_identifier, str) and port_identifier in zmq_ports: zmq_p = zmq_ports[port_identifier] try: @@ -178,6 +212,7 @@ def read(port_identifier, name, initstr_val): print(f"Unexpected error during ZMQ read on port {port_identifier} (name: {name}): {e}. Returning default.") return default_return_val + # Case 2: File-based port try: file_port_num = int(port_identifier) except ValueError: @@ -197,6 +232,7 @@ def read(port_identifier, name, initstr_val): print(f"Error reading {file_path}: {e}. Using default value.") return default_return_val + # Retry logic if file is empty attempts = 0 max_retries = 5 while len(ins) == 0 and attempts < max_retries: @@ -214,6 +250,8 @@ def read(port_identifier, name, initstr_val): return default_return_val s += ins + + # Try parsing try: inval = literal_eval(ins) if isinstance(inval, list) and len(inval) > 0: @@ -230,8 +268,13 @@ def read(port_identifier, name, initstr_val): def write(port_identifier, name, val, delta=0): + """ + Write data either to ZMQ port or file. + `val` must be list (with simtime prefix) or string. + """ global simtime + # Case 1: ZMQ port if isinstance(port_identifier, str) and port_identifier in zmq_ports: zmq_p = zmq_ports[port_identifier] try: @@ -240,7 +283,8 @@ def write(port_identifier, name, val, delta=0): print(f"ZMQ write error on port {port_identifier} (name: {name}): {e}") except Exception as e: print(f"Unexpected error during ZMQ write on port {port_identifier} (name: {name}): {e}") - + + # Case 2: File-based port try: if isinstance(port_identifier, str) and port_identifier in zmq_ports: file_path = os.path.join("../"+port_identifier, name) @@ -251,8 +295,9 @@ def write(port_identifier, name, val, delta=0): print(f"Error: Invalid port identifier '{port_identifier}' for file operation. Must be integer or ZMQ name.") return + # File writing rules if isinstance(val, str): - time.sleep(2 * delay) + time.sleep(2 * delay) # string writes wait longer elif not isinstance(val, list): print(f"File write to {file_path} must have list or str value, got {type(val)}") return @@ -269,6 +314,10 @@ def write(port_identifier, name, val, delta=0): print(f"Error writing to {file_path}: {e}") def initval(simtime_val_str): + """ + Initialize simtime from string containing a list. + Example: "[10, 'foo', 'bar']" → simtime=10, returns ['foo','bar'] + """ global simtime try: val = literal_eval(simtime_val_str) diff --git a/measurements/Throughput/maximumThroughputComaprison.py b/measurements/Throughput/maximumThroughputComaprison.py index 1dd0342..607d259 100644 --- a/measurements/Throughput/maximumThroughputComaprison.py +++ b/measurements/Throughput/maximumThroughputComaprison.py @@ -15,7 +15,6 @@ # Add plot details plt.ylabel('Throughput (Messages/Second)', fontsize=14) -plt.title('Figure 6: Maximum Throughput Comparison', fontsize=18, pad=25) plt.xticks(fontsize=12) plt.yscale('log') # log scale for large differences plt.grid(axis='y', linestyle='--', alpha=0.7) diff --git a/measurements/Throughput/throughput_comparison.pdf b/measurements/Throughput/throughput_comparison.pdf index 7793b13..5854b05 100644 Binary files a/measurements/Throughput/throughput_comparison.pdf and b/measurements/Throughput/throughput_comparison.pdf differ diff --git a/mkconcore.py b/mkconcore.py index c05e8aa..b243c23 100644 --- a/mkconcore.py +++ b/mkconcore.py @@ -1,3 +1,67 @@ +# The script handles different environments: Docker, POSIX (macOS/Ubuntu), and Windows. +# It reads the graph nodes (representing computational tasks) and edges (representing data flow). +# Based on this information, it generates a directory structure and a set of helper scripts +# (build, run, stop, clear, maxtime, params, unlock) to manage the workflow. +# It also includes logic to handle "script specialization" for ZMQ-based communication, +# where it modifies source files to include specific port and port-name information. + +# The script does the following: +# 1. Initial Setup and Argument Parsing: +# - Defines global constants for tool names (g++, iverilog, python3, matlab, etc.) and paths. +# - Parses command-line arguments for the GraphML file, source directory, output directory, and execution type (posix, windows, docker). +# - Checks for the existence of input/output directories and creates the output structure. +# - Logs the configuration details. + +# 2. Graph Parsing and Adjacency Matrix Creation: +# - Uses BeautifulSoup to parse the input GraphML file. +# - Identifies nodes and edges, storing them in dictionaries. +# - Creates a simple adjacency matrix (m) and a reachability matrix (ms) from the graph, +# detecting any unreachable nodes and logging a warning. + +# 3. Script Specialization (Aggregation and Execution): +# - This is a key part of the logic that handles ZMQ connections. +# - It iterates through the edges, specifically looking for ones with labels in the format "0x_". +# - It aggregates these port parameters for each node. +# - It then uses an external script `copy_with_port_portname.py` to "specialize" the original source files. This means it creates +# new versions of the scripts, injecting the ZMQ port information directly into the code. +# - The `nodes_dict` is then updated to point to these newly created, specialized scripts. + +# 4. Port Mapping and File Generation: +# - Generates `.iport` (input port) and `.oport` (output port) mapping files for each node. +# - These files are simple dictionaries that map volume names (for file-based communication) or port names (for ZMQ) +# to their corresponding port numbers or indices. This allows the individual scripts to know how to connect to their +# peers in the graph. + +# 5. File Copying and Script Generation: +# - Copies all necessary source files (`.py`, `.cpp`, `.m`, etc.) from the source directory to the `outdir/src` directory. +# - Handles cases where specialized scripts were created, ensuring the new files are copied instead of the originals. +# - Copies a set of standard `concore` files (`.py`, `.hpp`, `.v`, `.m`, `mkcompile`) into the `src` directory. + +# 6. Environment-Specific Scripting (Main Logic Branches): +# - This is the largest and most complex part, where the script's behavior diverges based on the `concoretype`. + +# a. Docker: +# - Generates `Dockerfile`s for each node's container. If a custom Dockerfile exists in the source directory, it's used. +# Otherwise, it generates a default one based on the file extension (`.py`, `.cpp`, etc.). +# - Creates `build.bat` (Windows) or `build` (POSIX) scripts to build the Docker images for each node. +# - Creates `run.bat`/`run` scripts to launch the containers, setting up the necessary shared volumes (`-v`) for data transfer. +# - Creates `stop.bat`/`stop` and `clear.bat`/`clear` scripts to manage the containers and clean up the volumes. +# - Creates helper scripts like `maxtime.bat`/`maxtime`, `params.bat`/`params`, and `unlock.bat`/`unlock` to +# pass runtime parameters or API keys to the containers. + +# b. POSIX (Linux/macOS) and Windows: +# - These branches handle direct execution on the host machine without containers. +# - Creates a separate directory for each node inside the output directory. +# - Uses the `build` script to copy source files and create symbolic links (`ln -s` on POSIX, `mklink` on Windows) +# between the node directories and the shared data directories (representing graph edges). +# - Generates `run` and `debug` scripts to execute the programs. It uses platform-specific commands +# like `start /B` for Windows and `xterm -e` or `osascript` for macOS to run the processes. +# - The `stop` and `clear` scripts use `kill` or `del` commands to manage the running processes and files. +# - Generates `maxtime`, `params`, and `unlock` scripts that directly write files to the shared directories. + +# 7. Permissions: +# - Sets the executable permission (`stat.S_IRWXU`) for the generated scripts on POSIX systems. + from bs4 import BeautifulSoup import logging import re @@ -17,13 +81,13 @@ CPPEXE = "g++" #Ubuntu/macOS C++ 6/22/21 VWIN = "iverilog" #Windows verilog 6/25/21 VEXE = "iverilog" #Ubuntu/macOS verilog 6/25/21 -PYTHONEXE = "python3" #Ubuntu/macOS python 3 -PYTHONWIN = "python" #Windows python 3 +PYTHONEXE = "python3" #Ubuntu/macOS python3 +PYTHONWIN = "python" #Windows python3 MATLABEXE = "matlab" #Ubuntu/macOS matlab MATLABWIN = "matlab" #Windows matlab -OCTAVEEXE = "octave" #Ubuntu/macOS octave -OCTAVEWIN = "octave" #Windows octave -M_IS_OCTAVE = False #treat .m as octave +OCTAVEEXE = "octave" #Ubuntu/macOS octave +OCTAVEWIN = "octave" #Windows octave +M_IS_OCTAVE = False #treat .m as octave MCRPATH = "~/MATLAB/R2021a" #path to local Ubunta Matlab Compiler Runtime DOCKEREXE = "sudo docker"#assume simple docker install DOCKEREPO = "markgarnold"#where pulls come from 3/28/21