Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 60 additions & 11 deletions concore.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,35 @@
import re
import zmq # Added for ZeroMQ

#if windows, create script to kill this process
# if windows, create script to kill this process
# because batch files don't provide easy way to know pid of last command
# ignored for posix!=windows, because "concorepid" is handled by script
# ignored for docker (linux!=windows), because handled by docker stop
# ignored for posix != windows, because "concorepid" is handled by script
# ignored for docker (linux != windows), because handled by docker stop
if hasattr(sys, 'getwindowsversion'):
with open("concorekill.bat","w") as fpid:
fpid.write("taskkill /F /PID "+str(os.getpid())+"\n")

# --- ZeroMQ Integration Start ---
# ===================================================================
# ZeroMQ Communication Wrapper
# ===================================================================
class ZeroMQPort:
def __init__(self, port_type, address, zmq_socket_type):
"""
port_type: "bind" or "connect"
address: ZeroMQ address (e.g., "tcp://*:5555")
zmq_socket_type: zmq.REQ, zmq.REP, zmq.PUB, zmq.SUB etc.
"""
self.context = zmq.Context()
self.socket = self.context.socket(zmq_socket_type)
self.port_type = port_type # "bind" or "connect"
self.address = address

self.socket.setsockopt(zmq.RCVTIMEO, 2000)
self.socket.setsockopt(zmq.SNDTIMEO, 2000)
self.socket.setsockopt(zmq.LINGER, 0)
# Configure timeouts & immediate close on failure
self.socket.setsockopt(zmq.RCVTIMEO, 2000) # 2 sec receive timeout
self.socket.setsockopt(zmq.SNDTIMEO, 2000) # 2 sec send timeout
self.socket.setsockopt(zmq.LINGER, 0) # Drop pending messages on close

# Bind or connect
if self.port_type == "bind":
self.socket.bind(address)
print(f"ZMQ Port bound to {address}")
Expand All @@ -33,6 +42,7 @@ def __init__(self, port_type, address, zmq_socket_type):
print(f"ZMQ Port connected to {address}")

def send_json_with_retry(self, message):
"""Send JSON message with retries if timeout occurs."""
for attempt in range(5):
try:
self.socket.send_json(message)
Expand All @@ -44,6 +54,7 @@ def send_json_with_retry(self, message):
return

def recv_json_with_retry(self):
"""Receive JSON message with retries if timeout occurs."""
for attempt in range(5):
try:
return self.socket.recv_json()
Expand Down Expand Up @@ -89,6 +100,9 @@ def terminate_zmq():
print(f"Error while terminating ZMQ port {port.address}: {e}")
# --- ZeroMQ Integration End ---

# ===================================================================
# File & Parameter Handling
# ===================================================================
def safe_literal_eval(filename, defaultValue):
try:
with open(filename, "r") as file:
Expand All @@ -97,10 +111,13 @@ def safe_literal_eval(filename, defaultValue):
# Keep print for debugging, but can be made quieter
# print(f"Info: Error reading {filename} or file not found, using default: {e}")
return defaultValue



# Load input/output ports if present
iport = safe_literal_eval("concore.iport", {})
oport = safe_literal_eval("concore.oport", {})

# Global variables
s = ''
olds = ''
delay = 1
Expand All @@ -110,14 +127,20 @@ def safe_literal_eval(filename, defaultValue):
simtime = 0

#9/21/22
# ===================================================================
# Parameter Parsing
# ===================================================================
try:
sparams_path = os.path.join(inpath + "1", "concore.params")
if os.path.exists(sparams_path):
with open(sparams_path, "r") as f:
sparams = f.read()
if sparams: # Ensure sparams is not empty
# Windows sometimes keeps quotes
if sparams[0] == '"' and sparams[-1] == '"': #windows keeps "" need to remove
sparams = sparams[1:-1]

# Convert key=value;key2=value2 to Python dict format
if sparams != '{' and not (sparams.startswith('{') and sparams.endswith('}')): # Check if it needs conversion
print("converting sparams: "+sparams)
sparams = "{'"+re.sub(';',",'",re.sub('=',"':",re.sub(' ','',sparams)))+"}"
Expand All @@ -137,35 +160,46 @@ def safe_literal_eval(filename, defaultValue):

#9/30/22
def tryparam(n, i):
"""Return parameter `n` from params dict, else default `i`."""
return params.get(n, i)


#9/12/21
# ===================================================================
# Simulation Time Handling
# ===================================================================
def default_maxtime(default):
"""Read maximum simulation time from file or use default."""
global maxtime
maxtime_path = os.path.join(inpath + "1", "concore.maxtime")
maxtime = safe_literal_eval(maxtime_path, default)

default_maxtime(100)

def unchanged():
"""Check if global string `s` is unchanged since last call."""
global olds, s
if olds == s:
s = ''
return True
olds = s
return False

# ===================================================================
# I/O Handling (File + ZMQ)
# ===================================================================
def read(port_identifier, name, initstr_val):
global s, simtime, retrycount

# Default return
default_return_val = initstr_val
if isinstance(initstr_val, str):
try:
default_return_val = literal_eval(initstr_val)
except (SyntaxError, ValueError):
pass


# Case 1: ZMQ port
if isinstance(port_identifier, str) and port_identifier in zmq_ports:
zmq_p = zmq_ports[port_identifier]
try:
Expand All @@ -178,6 +212,7 @@ def read(port_identifier, name, initstr_val):
print(f"Unexpected error during ZMQ read on port {port_identifier} (name: {name}): {e}. Returning default.")
return default_return_val

# Case 2: File-based port
try:
file_port_num = int(port_identifier)
except ValueError:
Expand All @@ -197,6 +232,7 @@ def read(port_identifier, name, initstr_val):
print(f"Error reading {file_path}: {e}. Using default value.")
return default_return_val

# Retry logic if file is empty
attempts = 0
max_retries = 5
while len(ins) == 0 and attempts < max_retries:
Expand All @@ -214,6 +250,8 @@ def read(port_identifier, name, initstr_val):
return default_return_val

s += ins

# Try parsing
try:
inval = literal_eval(ins)
if isinstance(inval, list) and len(inval) > 0:
Expand All @@ -230,8 +268,13 @@ def read(port_identifier, name, initstr_val):


def write(port_identifier, name, val, delta=0):
"""
Write data either to ZMQ port or file.
`val` must be list (with simtime prefix) or string.
"""
global simtime

# Case 1: ZMQ port
if isinstance(port_identifier, str) and port_identifier in zmq_ports:
zmq_p = zmq_ports[port_identifier]
try:
Expand All @@ -240,7 +283,8 @@ def write(port_identifier, name, val, delta=0):
print(f"ZMQ write error on port {port_identifier} (name: {name}): {e}")
except Exception as e:
print(f"Unexpected error during ZMQ write on port {port_identifier} (name: {name}): {e}")


# Case 2: File-based port
try:
if isinstance(port_identifier, str) and port_identifier in zmq_ports:
file_path = os.path.join("../"+port_identifier, name)
Expand All @@ -251,8 +295,9 @@ def write(port_identifier, name, val, delta=0):
print(f"Error: Invalid port identifier '{port_identifier}' for file operation. Must be integer or ZMQ name.")
return

# File writing rules
if isinstance(val, str):
time.sleep(2 * delay)
time.sleep(2 * delay) # string writes wait longer
elif not isinstance(val, list):
print(f"File write to {file_path} must have list or str value, got {type(val)}")
return
Expand All @@ -269,6 +314,10 @@ def write(port_identifier, name, val, delta=0):
print(f"Error writing to {file_path}: {e}")

def initval(simtime_val_str):
"""
Initialize simtime from string containing a list.
Example: "[10, 'foo', 'bar']" → simtime=10, returns ['foo','bar']
"""
global simtime
try:
val = literal_eval(simtime_val_str)
Expand Down
1 change: 0 additions & 1 deletion measurements/Throughput/maximumThroughputComaprison.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

# Add plot details
plt.ylabel('Throughput (Messages/Second)', fontsize=14)
plt.title('Figure 6: Maximum Throughput Comparison', fontsize=18, pad=25)
plt.xticks(fontsize=12)
plt.yscale('log') # log scale for large differences
plt.grid(axis='y', linestyle='--', alpha=0.7)
Expand Down
Binary file modified measurements/Throughput/throughput_comparison.pdf
Binary file not shown.
74 changes: 69 additions & 5 deletions mkconcore.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,67 @@
# The script handles different environments: Docker, POSIX (macOS/Ubuntu), and Windows.
# It reads the graph nodes (representing computational tasks) and edges (representing data flow).
# Based on this information, it generates a directory structure and a set of helper scripts
# (build, run, stop, clear, maxtime, params, unlock) to manage the workflow.
# It also includes logic to handle "script specialization" for ZMQ-based communication,
# where it modifies source files to include specific port and port-name information.

# The script does the following:
# 1. Initial Setup and Argument Parsing:
# - Defines global constants for tool names (g++, iverilog, python3, matlab, etc.) and paths.
# - Parses command-line arguments for the GraphML file, source directory, output directory, and execution type (posix, windows, docker).
# - Checks for the existence of input/output directories and creates the output structure.
# - Logs the configuration details.

# 2. Graph Parsing and Adjacency Matrix Creation:
# - Uses BeautifulSoup to parse the input GraphML file.
# - Identifies nodes and edges, storing them in dictionaries.
# - Creates a simple adjacency matrix (m) and a reachability matrix (ms) from the graph,
# detecting any unreachable nodes and logging a warning.

# 3. Script Specialization (Aggregation and Execution):
# - This is a key part of the logic that handles ZMQ connections.
# - It iterates through the edges, specifically looking for ones with labels in the format "0x<hex_port>_<port_name>".
# - It aggregates these port parameters for each node.
# - It then uses an external script `copy_with_port_portname.py` to "specialize" the original source files. This means it creates
# new versions of the scripts, injecting the ZMQ port information directly into the code.
# - The `nodes_dict` is then updated to point to these newly created, specialized scripts.

# 4. Port Mapping and File Generation:
# - Generates `.iport` (input port) and `.oport` (output port) mapping files for each node.
# - These files are simple dictionaries that map volume names (for file-based communication) or port names (for ZMQ)
# to their corresponding port numbers or indices. This allows the individual scripts to know how to connect to their
# peers in the graph.

# 5. File Copying and Script Generation:
# - Copies all necessary source files (`.py`, `.cpp`, `.m`, etc.) from the source directory to the `outdir/src` directory.
# - Handles cases where specialized scripts were created, ensuring the new files are copied instead of the originals.
# - Copies a set of standard `concore` files (`.py`, `.hpp`, `.v`, `.m`, `mkcompile`) into the `src` directory.

# 6. Environment-Specific Scripting (Main Logic Branches):
# - This is the largest and most complex part, where the script's behavior diverges based on the `concoretype`.

# a. Docker:
# - Generates `Dockerfile`s for each node's container. If a custom Dockerfile exists in the source directory, it's used.
# Otherwise, it generates a default one based on the file extension (`.py`, `.cpp`, etc.).
# - Creates `build.bat` (Windows) or `build` (POSIX) scripts to build the Docker images for each node.
# - Creates `run.bat`/`run` scripts to launch the containers, setting up the necessary shared volumes (`-v`) for data transfer.
# - Creates `stop.bat`/`stop` and `clear.bat`/`clear` scripts to manage the containers and clean up the volumes.
# - Creates helper scripts like `maxtime.bat`/`maxtime`, `params.bat`/`params`, and `unlock.bat`/`unlock` to
# pass runtime parameters or API keys to the containers.

# b. POSIX (Linux/macOS) and Windows:
# - These branches handle direct execution on the host machine without containers.
# - Creates a separate directory for each node inside the output directory.
# - Uses the `build` script to copy source files and create symbolic links (`ln -s` on POSIX, `mklink` on Windows)
# between the node directories and the shared data directories (representing graph edges).
# - Generates `run` and `debug` scripts to execute the programs. It uses platform-specific commands
# like `start /B` for Windows and `xterm -e` or `osascript` for macOS to run the processes.
# - The `stop` and `clear` scripts use `kill` or `del` commands to manage the running processes and files.
# - Generates `maxtime`, `params`, and `unlock` scripts that directly write files to the shared directories.

# 7. Permissions:
# - Sets the executable permission (`stat.S_IRWXU`) for the generated scripts on POSIX systems.

from bs4 import BeautifulSoup
import logging
import re
Expand All @@ -17,13 +81,13 @@
CPPEXE = "g++" #Ubuntu/macOS C++ 6/22/21
VWIN = "iverilog" #Windows verilog 6/25/21
VEXE = "iverilog" #Ubuntu/macOS verilog 6/25/21
PYTHONEXE = "python3" #Ubuntu/macOS python 3
PYTHONWIN = "python" #Windows python 3
PYTHONEXE = "python3" #Ubuntu/macOS python3
PYTHONWIN = "python" #Windows python3
MATLABEXE = "matlab" #Ubuntu/macOS matlab
MATLABWIN = "matlab" #Windows matlab
OCTAVEEXE = "octave" #Ubuntu/macOS octave
OCTAVEWIN = "octave" #Windows octave
M_IS_OCTAVE = False #treat .m as octave
OCTAVEEXE = "octave" #Ubuntu/macOS octave
OCTAVEWIN = "octave" #Windows octave
M_IS_OCTAVE = False #treat .m as octave
MCRPATH = "~/MATLAB/R2021a" #path to local Ubunta Matlab Compiler Runtime
DOCKEREXE = "sudo docker"#assume simple docker install
DOCKEREPO = "markgarnold"#where pulls come from 3/28/21
Expand Down