Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
b98040a
Merge pull request #154 from ControlCore-Project/main
pradeeban May 3, 2025
df0d70c
Added A zeromq based study and also corrected some existing studies
saksham-gera Jun 2, 2025
54221b7
Merge pull request #155 from saksham-gera/dev
pradeeban Jun 2, 2025
5f64e20
Integrated the ZeroMQ with concore.py, I've tested the previous studi…
saksham-gera Jun 3, 2025
b8d764f
Merge pull request #156 from saksham-gera/dev
pradeeban Jun 3, 2025
3c2db12
For Port Assignment Based On Edge Label, This script will append ZMQ_…
saksham-gera Jun 4, 2025
5c9fb68
Merge pull request #157 from saksham-gera/dev
pradeeban Jun 5, 2025
7ca304a
added a function that will run the copy_with_port_portname script wit…
saksham-gera Jun 5, 2025
88ed4bf
Merge pull request #158 from saksham-gera/dev
pradeeban Jun 5, 2025
8c2f90e
Added A function to close all ports of zeromq running for a particula…
saksham-gera Jun 6, 2025
eb0b7f0
Merge pull request #159 from saksham-gera/dev
pradeeban Jun 7, 2025
86b2f17
Added the functionality to view results of zmq edges also, and along …
saksham-gera Jun 7, 2025
69bf02b
Merge pull request #160 from saksham-gera/dev
pradeeban Jun 7, 2025
3df6885
Added Some Studies that i created while testing ZeroMQ
saksham-gera Jun 8, 2025
834cdf1
Added a file only communication based study for testing
saksham-gera Jun 8, 2025
b7c21e8
Successfully Integrated zmq node detection and extraction of hexadeci…
saksham-gera Jun 8, 2025
6ec969f
Merge pull request #161 from saksham-gera/dev
pradeeban Jun 9, 2025
c87cf2f
Added A Functionality to see the ZMQ edge results also even if it is …
saksham-gera Jun 11, 2025
efd7896
Implemented ZeroMQ socket timeouts & retry logic fordropped connections.
saksham-gera Jun 13, 2025
a52cc74
Merge branch 'ControlCore-Project:dev' into dev
saksham-gera Jun 13, 2025
a6d06d2
Merge pull request #162 from saksham-gera/dev
pradeeban Jun 14, 2025
15dfcf7
Added A ZeroMQ Study for Quick Measurements based on adding 10 in sum…
saksham-gera Jun 17, 2025
e1a0bf3
Added A file only communication Study for Quick Measurements based on…
saksham-gera Jun 18, 2025
fb502ce
Merge pull request #163 from saksham-gera/dev
pradeeban Jun 21, 2025
f831a2e
Measurements Folder Added
saksham-gera Aug 7, 2025
80dc841
Merge pull request #164 from saksham-gera/dev
pradeeban Aug 9, 2025
8974f7a
Corrected Figure Names And Corrected readme.md
saksham-gera Aug 17, 2025
ec4ed34
Merge pull request #165 from saksham-gera/dev
pradeeban Aug 19, 2025
4988348
replaced the png format with pdf, now all plots will be saved as pdf
saksham-gera Aug 22, 2025
aa16139
Merge pull request #166 from saksham-gera/dev
pradeeban Aug 24, 2025
e76c3c7
Added The Comments in Both concore.py and mkconcore.py
saksham-gera Aug 26, 2025
9c1c4ca
Merge remote-tracking branch 'origin/dev' into dev
saksham-gera Aug 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
263 changes: 263 additions & 0 deletions 0mq/comm_node.dir/concore2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
import time
import os
from ast import literal_eval
import sys
import re
import zmq # Added for ZeroMQ

#if windows, create script to kill this process
# because batch files don't provide easy way to know pid of last command
# ignored for posix!=windows, because "concorepid" is handled by script
# ignored for docker (linux!=windows), because handled by docker stop
if hasattr(sys, 'getwindowsversion'):
with open("concorekill.bat","w") as fpid:
fpid.write("taskkill /F /PID "+str(os.getpid())+"\n")

# --- ZeroMQ Integration Start ---
class ZeroMQPort:
def __init__(self, port_type, address, zmq_socket_type):
self.context = zmq.Context()
self.socket = self.context.socket(zmq_socket_type)
self.port_type = port_type # "bind" or "connect"
self.address = address
if self.port_type == "bind":
self.socket.bind(address)
print(f"ZMQ Port bound to {address}")
else:
self.socket.connect(address)
print(f"ZMQ Port connected to {address}")

# Global ZeroMQ ports registry
zmq_ports = {}

def init_zmq_port(port_name, port_type, address, socket_type_str):
"""
Initializes and registers a ZeroMQ port.
port_name (str): A unique name for this ZMQ port.
port_type (str): "bind" or "connect".
address (str): The ZMQ address (e.g., "tcp://*:5555", "tcp://localhost:5555").
socket_type_str (str): String representation of ZMQ socket type (e.g., "REQ", "REP", "PUB", "SUB").
"""
if port_name in zmq_ports:
print(f"ZMQ Port {port_name} already initialized.")
return # Avoid reinitialization

try:
# Map socket type string to actual ZMQ constant (e.g., zmq.REQ, zmq.REP)
zmq_socket_type = getattr(zmq, socket_type_str.upper())
zmq_ports[port_name] = ZeroMQPort(port_type, address, zmq_socket_type)
print(f"Initialized ZMQ port: {port_name} ({socket_type_str}) on {address}")
except AttributeError:
print(f"Error: Invalid ZMQ socket type string '{socket_type_str}'.")
except zmq.error.ZMQError as e:
print(f"Error initializing ZMQ port {port_name} on {address}: {e}")
except Exception as e:
print(f"An unexpected error occurred during ZMQ port initialization for {port_name}: {e}")

def terminate_zmq():
for port in zmq_ports.values():
try:
port.socket.close()
port.context.term()
except Exception as e:
print(f"Error while terminating ZMQ port {port.address}: {e}")
# --- ZeroMQ Integration End ---

def safe_literal_eval(filename, defaultValue):
try:
with open(filename, "r") as file:
return literal_eval(file.read())
except (FileNotFoundError, SyntaxError, ValueError, Exception) as e:
# Keep print for debugging, but can be made quieter
# print(f"Info: Error reading {filename} or file not found, using default: {e}")
return defaultValue

iport = safe_literal_eval("concore.iport", {})
oport = safe_literal_eval("concore.oport", {})

s = ''
olds = ''
delay = 1
retrycount = 0
inpath = "./in" #must be rel path for local
outpath = "./out"
simtime = 0

#9/21/22
try:
sparams_path = os.path.join(inpath + "1", "concore.params")
if os.path.exists(sparams_path):
with open(sparams_path, "r") as f:
sparams = f.read()
if sparams: # Ensure sparams is not empty
if sparams[0] == '"' and sparams[-1] == '"': #windows keeps "" need to remove
sparams = sparams[1:-1]
if sparams != '{' and not (sparams.startswith('{') and sparams.endswith('}')): # Check if it needs conversion
print("converting sparams: "+sparams)
sparams = "{'"+re.sub(';',",'",re.sub('=',"':",re.sub(' ','',sparams)))+"}"
print("converted sparams: " + sparams)
try:
params = literal_eval(sparams)
except Exception as e:
print(f"bad params content: {sparams}, error: {e}")
params = dict()
else:
params = dict()
else:
params = dict()
except Exception as e:
# print(f"Info: concore.params not found or error reading, using empty dict: {e}")
params = dict()

#9/30/22
def tryparam(n, i):
return params.get(n, i)


#9/12/21
def default_maxtime(default):
global maxtime
maxtime_path = os.path.join(inpath + "1", "concore.maxtime")
maxtime = safe_literal_eval(maxtime_path, default)

default_maxtime(100)

def unchanged():
global olds, s
if olds == s:
s = ''
return True
olds = s
return False

def read(port_identifier, name, initstr_val):
global s, simtime, retrycount

default_return_val = initstr_val
if isinstance(initstr_val, str):
try:
default_return_val = literal_eval(initstr_val)
except (SyntaxError, ValueError):
pass

if isinstance(port_identifier, str) and port_identifier in zmq_ports:
zmq_p = zmq_ports[port_identifier]
try:
message = zmq_p.socket.recv_json()
return message
except zmq.error.ZMQError as e:
print(f"ZMQ read error on port {port_identifier} (name: {name}): {e}. Returning default.")
return default_return_val
except Exception as e:
print(f"Unexpected error during ZMQ read on port {port_identifier} (name: {name}): {e}. Returning default.")
return default_return_val

try:
file_port_num = int(port_identifier)
except ValueError:
print(f"Error: Invalid port identifier '{port_identifier}' for file operation. Must be integer or ZMQ name.")
return default_return_val

time.sleep(delay)
file_path = os.path.join(inpath+str(file_port_num), name)
ins = ""

try:
with open(file_path, "r") as infile:
ins = infile.read()
except FileNotFoundError:
ins = str(initstr_val)
except Exception as e:
print(f"Error reading {file_path}: {e}. Using default value.")
return default_return_val

attempts = 0
max_retries = 5
while len(ins) == 0 and attempts < max_retries:
time.sleep(delay)
try:
with open(file_path, "r") as infile:
ins = infile.read()
except Exception as e:
print(f"Retry {attempts + 1}: Error reading {file_path} - {e}")
attempts += 1
retrycount += 1

if len(ins) == 0:
print(f"Max retries reached for {file_path}, using default value.")
return default_return_val

s += ins
try:
inval = literal_eval(ins)
if isinstance(inval, list) and len(inval) > 0:
current_simtime_from_file = inval[0]
if isinstance(current_simtime_from_file, (int, float)):
simtime = max(simtime, current_simtime_from_file)
return inval[1:]
else:
print(f"Warning: Unexpected data format in {file_path}: {ins}. Returning raw content or default.")
return inval
except Exception as e:
print(f"Error parsing content from {file_path} ('{ins}'): {e}. Returning default.")
return default_return_val


def write(port_identifier, name, val, delta=0):
global simtime

if isinstance(port_identifier, str) and port_identifier in zmq_ports:
zmq_p = zmq_ports[port_identifier]
try:
zmq_p.socket.send_json(val)
except zmq.error.ZMQError as e:
print(f"ZMQ write error on port {port_identifier} (name: {name}): {e}")
except Exception as e:
print(f"Unexpected error during ZMQ write on port {port_identifier} (name: {name}): {e}")
return
try:
if isinstance(port_identifier, str) and port_identifier in zmq_ports:
file_path = os.path.join("../"+port_identifier, name)
else:
file_port_num = int(port_identifier)
file_path = os.path.join(outpath+str(file_port_num), name)
except ValueError:
print(f"Error: Invalid port identifier '{port_identifier}' for file operation. Must be integer or ZMQ name.")
return

if isinstance(val, str):
time.sleep(2 * delay)
elif not isinstance(val, list):
print(f"File write to {file_path} must have list or str value, got {type(val)}")
return

try:
with open(file_path, "w") as outfile:
if isinstance(val, list):
data_to_write = [simtime + delta] + val
outfile.write(str(data_to_write))
simtime += delta
else:
outfile.write(val)
except Exception as e:
print(f"Error writing to {file_path}: {e}")

def initval(simtime_val_str):
global simtime
try:
val = literal_eval(simtime_val_str)
if isinstance(val, list) and len(val) > 0:
first_element = val[0]
if isinstance(first_element, (int, float)):
simtime = first_element
return val[1:]
else:
print(f"Error: First element in initval string '{simtime_val_str}' is not a number. Using data part as is or empty.")
return val[1:] if len(val) > 1 else []
else:
print(f"Error: initval string '{simtime_val_str}' is not a list or is empty. Returning empty list.")
return []

except Exception as e:
print(f"Error parsing simtime_val_str '{simtime_val_str}': {e}. Returning empty list.")
return []
25 changes: 25 additions & 0 deletions 0mq/comm_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import concore
import concore2

concore.delay = 0.07
concore2.delay = 0.07
concore2.inpath = concore.inpath
concore2.outpath = concore.outpath
concore2.simtime = 0
concore.default_maxtime(100)
init_simtime_u = "[0.0, 0.0, 0.0]"
init_simtime_ym = "[0.0, 0.0, 0.0]"

u = concore.initval(init_simtime_u)
ym = concore2.initval(init_simtime_ym)
while(concore2.simtime<concore.maxtime):
while concore.unchanged():
u = concore.read(concore.iport['U'],"u",init_simtime_u)
concore.write(concore.oport['U1'],"u",u)
print(u)
old2 = concore2.simtime
while concore2.unchanged() or concore2.simtime <= old2:
ym = concore2.read(concore.iport['Y1'],"ym",init_simtime_ym)
concore2.write(concore.oport['Y'],"ym",ym)
print("funbody u="+str(u)+" ym="+str(ym)+" time="+str(concore2.simtime))
print("retry="+str(concore.retrycount))
Loading