Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions measurements/A.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# A.py (Client and Primary Measurement Node)
import concore
import time
import os
import psutil
import sys

# --- ZMQ Initialization ---
# This REQ socket connects to Node B
concore.init_zmq_port(
port_name=PORT_NAME_F1_F2,
port_type="connect",
address="tcp://localhost:" + PORT_F1_F2,
socket_type_str="REQ"
)

print("Node A client started.")

# --- Measurement Initialization ---
min_latency = float('inf')
max_latency = 0.0
total_latency = 0.0
message_count = 0
total_bytes = 0
process = psutil.Process(os.getpid())
overall_start_time = time.monotonic()
loop_start_time = 0

current_value = 0
max_value = 100

while current_value < max_value:
loop_start_time = time.monotonic() # Start timer for round-trip latency
print(f"Node A: Sending value {current_value:.2f} to Node B.")

# 1. Send the current value as a request to the pipeline
concore.write(PORT_NAME_F1_F2, "value", [current_value])
total_bytes += sys.getsizeof([current_value])

# 2. Wait for the final, processed value in reply
received_data = concore.read(PORT_NAME_F1_F2, "value", [0.0])

loop_end_time = time.monotonic()
latency_ms = (loop_end_time - loop_start_time) * 1000

# Update metrics
message_count += 1
min_latency = min(min_latency, latency_ms)
max_latency = max(max_latency, latency_ms)
total_latency += latency_ms

current_value = received_data[0]
print(f"Node A: Received final value {current_value:.2f} from the pipeline. | Latency: {latency_ms:.2f} ms")
print("-" * 20)

# --- Finalize and Report Measurements ---
overall_end_time = time.monotonic()
total_duration = overall_end_time - overall_start_time
cpu_usage = process.cpu_percent() / total_duration if total_duration > 0 else 0
avg_latency = total_latency / message_count if message_count > 0 else 0

print("\n" + "="*35)
print("--- NODE A: END-TO-END RESULTS ---")
print(f"Total pipeline iterations: {message_count}")
print(f"Total data sent: {total_bytes / 1024:.4f} KB")
print(f"Total End-to-End Time: {total_duration:.4f} seconds")
print("-" * 35)
print(f"Min round-trip latency: {min_latency:.2f} ms")
print(f"Avg round-trip latency: {avg_latency:.2f} ms")
print(f"Max round-trip latency: {max_latency:.2f} ms")
print("-" * 35)
print(f"Approximate CPU usage: {cpu_usage:.2f}%")
print("="*35)

print(f"\nNode A: Final value {current_value:.2f} reached the target. Terminating.")
concore.terminate_zmq()
60 changes: 60 additions & 0 deletions measurements/B.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# B.py (Broker with Measurements)
import concore
import time

# --- ZMQ Initialization ---
# This REP socket binds and waits for requests from Node A
concore.init_zmq_port(
port_name=PORT_NAME_F1_F2,
port_type="bind",
address="tcp://*:" + PORT_F1_F2,
socket_type_str="REP"
)
# This REQ socket connects to Node C
concore.init_zmq_port(
port_name=PORT_NAME_F2_F3,
port_type="connect",
address="tcp://localhost:" + PORT_F2_F3,
socket_type_str="REQ"
)

print("Node B broker started. Waiting for requests...")

# --- Measurement Initialization ---
start_time = time.monotonic()
messages_routed = 0

while True:
# 1. Wait for a request from Node A
value_from_a = concore.read(PORT_NAME_F1_F2, "value", [0.0])
received_value = value_from_a[0]
print(f"Node B: Received {received_value:.2f} from Node A. Forwarding to C...")

# 2. Send the received value as a new request to Node C
concore.write(PORT_NAME_F2_F3, "value", [received_value])

# 3. Wait for the reply from Node C
value_from_c = concore.read(PORT_NAME_F2_F3, "value", [0.0])
processed_value = value_from_c[0]
print(f"Node B: Received {processed_value:.2f} from Node C. Replying to A...")

# 4. Send the processed value back as a reply to Node A
concore.write(PORT_NAME_F1_F2, "value", [processed_value])
messages_routed += 1

# 5. Check termination condition
if processed_value >= 100:
break

# --- Finalize and Report Measurements ---
end_time = time.monotonic()
duration = end_time - start_time

print("\n" + "="*30)
print("--- NODE B: RESULTS ---")
print(f"Total messages routed: {messages_routed}")
print(f"Total execution time: {duration:.4f} seconds")
print("="*30)

print("\nNode B: Terminating.")
concore.terminate_zmq()
62 changes: 62 additions & 0 deletions measurements/C.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# C.py (Processing Server and Measurement Endpoint)
import concore
import time
import psutil
import os
import sys

# --- ZMQ Initialization ---
# This REP socket binds and waits for requests from Node B
concore.init_zmq_port(
port_name=PORT_NAME_F2_F3,
port_type="bind",
address="tcp://*:" + PORT_F2_F3,
socket_type_str="REP"
)

print("Node C server started. Waiting for requests...")

# --- Measurement Initialization ---
process = psutil.Process(os.getpid())
start_time = time.monotonic()
message_count = 0
total_bytes = 0

while True:
# 1. Wait to receive a request from Node B
received_data = concore.read(PORT_NAME_F2_F3, "value", [0.0])
received_value = received_data[0]

# Track received data for metrics
message_count += 1
total_bytes += sys.getsizeof(received_data)

print(f"Node C: Received {received_value:.2f} from Node B.")

# 2. Process the value (increment by 10)
new_value = received_value + 10
print(f"Node C: Sending back processed value {new_value:.2f}.")

# 3. Send the reply back to Node B
concore.write(PORT_NAME_F2_F3, "value", [new_value])

# 4. Check the value to know when to shut down gracefully.
if new_value >= 100:
break

# --- Finalize and Report Measurements ---
end_time = time.monotonic()
duration = end_time - start_time
# This captures the CPU usage over the process's lifetime relative to the test duration
cpu_usage = process.cpu_percent() / duration if duration > 0 else 0

print("\n" + "="*30)
print("--- NODE C: RESULTS ---")
print(f"Total messages processed: {message_count}")
print(f"Total data processed: {total_bytes / 1024:.4f} KB")
print(f"Total execution time: {duration:.4f} seconds")
print(f"Approximate CPU usage: {cpu_usage:.2f}%")
print("="*30)

print("\nNode C: Terminating.")
concore.terminate_zmq()
Loading