Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions 0mq/funbody_zmq.dir/concore2.py → 0mq/comm_node.dir/concore2.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ def init_zmq_port(port_name, port_type, address, socket_type_str):
except Exception as e:
print(f"An unexpected error occurred during ZMQ port initialization for {port_name}: {e}")

def terminate_zmq():
for port in zmq_ports.values():
try:
port.socket.close()
port.context.term()
except Exception as e:
print(f"Error while terminating ZMQ port {port.address}: {e}")
# --- ZeroMQ Integration End ---

def safe_literal_eval(filename, defaultValue):
Expand Down Expand Up @@ -207,16 +214,17 @@ def write(port_identifier, name, val, delta=0):
print(f"ZMQ write error on port {port_identifier} (name: {name}): {e}")
except Exception as e:
print(f"Unexpected error during ZMQ write on port {port_identifier} (name: {name}): {e}")
return

return
try:
file_port_num = int(port_identifier)
if isinstance(port_identifier, str) and port_identifier in zmq_ports:
file_path = os.path.join("../"+port_identifier, name)
else:
file_port_num = int(port_identifier)
file_path = os.path.join(outpath+str(file_port_num), name)
except ValueError:
print(f"Error: Invalid port identifier '{port_identifier}' for file operation. Must be integer or ZMQ name.")
return

file_path = os.path.join(outpath+str(file_port_num), name)

if isinstance(val, str):
time.sleep(2 * delay)
elif not isinstance(val, list):
Expand Down
25 changes: 25 additions & 0 deletions 0mq/comm_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import concore
import concore2

concore.delay = 0.07
concore2.delay = 0.07
concore2.inpath = concore.inpath
concore2.outpath = concore.outpath
concore2.simtime = 0
concore.default_maxtime(100)
init_simtime_u = "[0.0, 0.0, 0.0]"
init_simtime_ym = "[0.0, 0.0, 0.0]"

u = concore.initval(init_simtime_u)
ym = concore2.initval(init_simtime_ym)
while(concore2.simtime<concore.maxtime):
while concore.unchanged():
u = concore.read(concore.iport['U'],"u",init_simtime_u)
concore.write(concore.oport['U1'],"u",u)
print(u)
old2 = concore2.simtime
while concore2.unchanged() or concore2.simtime <= old2:
ym = concore2.read(concore.iport['Y1'],"ym",init_simtime_ym)
concore2.write(concore.oport['Y'],"ym",ym)
print("funbody u="+str(u)+" ym="+str(ym)+" time="+str(concore2.simtime))
print("retry="+str(concore.retrycount))
592 changes: 592 additions & 0 deletions 0mq/distributed_client.graphml

Large diffs are not rendered by default.

520 changes: 520 additions & 0 deletions 0mq/distributed_server.graphml

Large diffs are not rendered by default.

460 changes: 460 additions & 0 deletions 0mq/fileOnlyCommunication.graphml

Large diffs are not rendered by default.

45 changes: 45 additions & 0 deletions 0mq/firstNode.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# firstNode.py (Client/Orchestrator)
import concore
import time

# --- ZMQ Initialization ---
# This REQ socket connects to Node B (F2)
concore.init_zmq_port(
port_name=f"0x{PORT_F1_F2}_{PORT_NAME_F1_F2}",
port_type="connect",
address="tcp://localhost:" + PORT_F1_F2,
socket_type_str="REQ"
)
# This REQ socket connects to Node C (F3)
concore.init_zmq_port(
port_name=f"0x{PORT_F1_F3}_{PORT_NAME_F1_F3}",
port_type="connect",
address="tcp://localhost:" + PORT_F1_F3,
socket_type_str="REQ"
)

current_value = 0.0

while current_value <= 100:
# --- Step 1: Communicate with Node B ---
print(f"Node A: Sending value {current_value:.2f} to Node B.")
concore.write(f"0x{PORT_F1_F2}_{PORT_NAME_F1_F2}", "value", [current_value])

# Wait for the reply from Node B
value_from_b = concore.read(f"0x{PORT_F1_F2}_{PORT_NAME_F1_F2}", "value", [current_value])
processed_by_b = value_from_b[0]
print(f"Node A: Received processed value {processed_by_b:.2f} from Node B.")

# --- Step 2: Communicate with Node C ---
print(f"Node A: Sending value {processed_by_b:.2f} to Node C.")
concore.write(f"0x{PORT_F1_F3}_{PORT_NAME_F1_F3}", "value", [processed_by_b])

# Wait for the reply from Node C
value_from_c = concore.read(f"0x{PORT_F1_F3}_{PORT_NAME_F1_F3}", "value", [processed_by_b])
current_value = value_from_c[0]
print(f"Node A: Received final value {current_value:.2f} from Node C.")
print("-" * 20)
time.sleep(1) # Slow down the loop for readability

print("\nNode A: Value exceeded 100. Terminating.")
concore.terminate_zmq()
Loading