In [None]:
import subprocess
import time
import signal
from kafka import KafkaConsumer, TopicPartition
import re

commands = {
    "web-NotreDame.txt": {
        "dynamic-connectivity-ett-1.3.0.jar": [0, 1, 2],
        "dynamic-connectivity-lct-1.3.0.jar": [0, 1, 2],
        "dynamic-connectivity-incremental-1.3.0.jar": [0],
        "dynamic-mst-ett-decremental-1.3.0.jar": [1],
        "dynamic-sssp-decremental-1.3.0.jar": [1],
    },
    "Email-EuAll.txt": {
        "dynamic-connectivity-ett-1.3.0.jar": [0, 1, 2],
        "dynamic-connectivity-lct-1.3.0.jar": [0, 1, 2],
        "dynamic-connectivity-incremental-1.3.0.jar": [0],
        "dynamic-mst-ett-decremental-1.3.0.jar": [1],
        "dynamic-sssp-decremental-1.3.0.jar": [1],
    },
    "facebook_combined.txt": {
        "dynamic-connectivity-ett-1.3.0.jar": [0, 1, 2],
        "dynamic-connectivity-lct-1.3.0.jar": [0, 1, 2],
        "dynamic-connectivity-incremental-1.3.0.jar": [0],
        "dynamic-mst-ett-decremental-1.3.0.jar": [1],
        "dynamic-sssp-decremental-1.3.0.jar": [1],
    }
}

data_len = {
    "web-NotreDame.txt": 1497134,
    "Email-EuAll.txt": 420045,
    "facebook_combined.txt": 88234
}

time_limits = {
    "web-NotreDame.txt": 480,
    "Email-EuAll.txt": 240,
    "facebook_combined.txt": 240
}

def get_flink_jobid(log_path="../logs/runner.log"):
    with open(log_path, "r") as f:
        content = f.read()
    match = re.search(r"JobID\s+([a-fA-F0-9]+)", content)
    if match:
        return match.group(1)
    return None

def generate_commands(nr=-1):
    cmds = []
    for data_file, algos in commands.items():
        for algo, modes in algos.items():
            for mode in modes:
                cmds.append((algo, str(mode), data_file, str(data_len[data_file]), str(time_limits[data_file])))
    return cmds if nr == -1 else cmds[nr]

def get_message_count(broker='localhost:19092', topic='metrics'):
    consumer = KafkaConsumer(bootstrap_servers=broker)
    partitions = consumer.partitions_for_topic(topic)
    if not partitions:
        return 0

    total = 0
    for p in partitions:
        tp = TopicPartition(topic, p)
        consumer.assign([tp])
        consumer.seek_to_beginning(tp)
        start = consumer.position(tp)
        consumer.seek_to_end(tp)
        end = consumer.position(tp)
        total += end - start
    consumer.close()
    return total

cmd = generate_commands(15)
background_cmd = ["python3", "trackMemInDocker.py", cmd[0], cmd[1], cmd[2]]
foreground_cmd = ["bash", "test_algorithm.sh", cmd[0], cmd[1], cmd[3], cmd[2], cmd[4]]
flink_cmd = ["bash", "run_job.sh", cmd[0], cmd[1], cmd[3]]
stop_flink_job = ["bash", "stop_job.sh"]
foreground_cmd2 = ["bash", "end_test_algorithm.sh", cmd[0], cmd[1], cmd[3], cmd[2], cmd[4]]

def main():
    # Start the background process
    print("Starting background command...")
    bg_process = subprocess.Popen(background_cmd)

    try:
        # Run the foreground command and wait for it to finish
        print("Running foreground command...")
        subprocess.run(foreground_cmd, check=True)
        with open("../logs/runner.log", "a") as log_file:
            flink_process = subprocess.Popen(flink_cmd, stdout=log_file, stderr=subprocess.STDOUT)
        amt_of_entries = 0
        while amt_of_entries < int(cmd[4]):
            time.sleep(10)
            amt_of_entries = get_message_count()
            print(f"Current amount of entries in Kafka topic 'metrics': {amt_of_entries}")

        job_id = get_flink_jobid()
        stop_flink_job.append(job_id)

        print("Stopping Flink job...")
        subprocess.run(stop_flink_job, check=True)
        print(job_id)
        flink_process.terminate()
        flink_process.wait()
        print("Flink job completed.")

        subprocess.run(foreground_cmd2, check=True)

    finally:
        # Terminate the background process
        print("Terminating background command...")
        bg_process.send_signal(signal.SIGINT)
        # bg_process.terminate()
        bg_process.wait()
        print("Background command terminated.")

    print("Foreground command completed.")

if __name__ == "__main__":
    print(cmd)
    main()

('dynamic-connectivity-incremental-1.3.0.jar', '0', 'Email-EuAll.txt', '420045', '240')
Starting background command...
Running foreground command...
Clean-up...[32mDONE[0m
Cleaning topics...Error while executing topic command : Topic 'input' does not exist as expected


[2025-08-31 12:25:58,316] ERROR java.lang.IllegalArgumentException: Topic 'input' does not exist as expected
	at kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:401)
	at kafka.admin.TopicCommand$TopicService.deleteTopic(TopicCommand.scala:361)
	at kafka.admin.TopicCommand$.main(TopicCommand.scala:63)
	at kafka.admin.TopicCommand.main(TopicCommand.scala)
 (kafka.admin.TopicCommand$)


Error while executing topic command : Topic 'output' does not exist as expected


[2025-08-31 12:26:00,127] ERROR java.lang.IllegalArgumentException: Topic 'output' does not exist as expected
	at kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:401)
	at kafka.admin.TopicCommand$TopicService.deleteTopic(TopicCommand.scala:361)
	at kafka.admin.TopicCommand$.main(TopicCommand.scala:63)
	at kafka.admin.TopicCommand.main(TopicCommand.scala)
 (kafka.admin.TopicCommand$)


Error while executing topic command : Topic 'metrics' does not exist as expected


[2025-08-31 12:26:01,893] ERROR java.lang.IllegalArgumentException: Topic 'metrics' does not exist as expected
	at kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:401)
	at kafka.admin.TopicCommand$TopicService.deleteTopic(TopicCommand.scala:361)
	at kafka.admin.TopicCommand$.main(TopicCommand.scala:63)
	at kafka.admin.TopicCommand.main(TopicCommand.scala)
 (kafka.admin.TopicCommand$)


[32mDONE[0m
Sleeping...