# PeerDAS simulation

## Imports

In [None]:
import subprocess
from pathos.multiprocessing import ProcessPool
import os
import random
import requests

if not os.path.exists("raw_traces"):
    os.mkdir("raw_traces")
else:
    # delete all files in raw_traces
    for file in os.listdir("raw_traces"):
        os.remove(os.path.join("raw_traces", file))


## Experiment 1: 
Setup

## Setup topics and participants

In [None]:
num_topics = 8

In [None]:
topics = [f'/eth2/fulu_fork_digest/data_column_sidecar_{i}/ssz_snappy' for i in range(num_topics)]

## Subscriptions

### Check node status

In [None]:
ips = []
with open("ips.txt", "r") as f:
    for line in f:
        ips.append(line.strip().split(":")[0])

nodes_down = []

for ip in ips: 
    status = requests.get(f"http://{ip}:9090/api/v1/health", timeout=10)
    if status.json()['status'] != 'ok':
        nodes_down.append(ip)

print("Nodes up: ", len(ips) - len(nodes_down))
print("Nodes down: ", len(nodes_down))

if len(nodes_down) > 0:
    print("Nodes down: ", nodes_down)



In [None]:
subscribe_processes = []  # Keep track of all started processes
trace_files = []

for i,topic in enumerate(topics):
    trace_file = f"raw_traces/trace-data-col-{i}.tsv"
    trace_files.append(trace_file)
    # select a random node to publish
    proc = subprocess.Popen([
        "./p2p-multi-subscribe",
        "-topic", topic,
        "-ipfile", "ips.txt",
        "-output-trace", trace_file,
    ])
    subscribe_processes.append(proc)

print(f"Started {len(subscribe_processes)} subscriber processes")



In [None]:
def publish_data(topic, node_id):

    return subprocess.Popen([
        "./p2p-multi-publish",
        "-topic", topic,
        "-ipfile", "ips.txt",
        "-datasize", "16384",
        "-start-index", "0",
        "-end-index", "1",
        "-count", "1",
        "-sleep", "12s"
    ])

pool = ProcessPool(nodes=len(topics))

def worker_task(topics_and_node_ids):
    publish_data(topics_and_node_ids[0], topics_and_node_ids[1])

ids = [i for i in range(len(topics))]

topics_and_node_ids = list(zip(topics, [ids.pop(random.randint(0, len(ids) - 1)) for _ in range(len(topics))]))
results = pool.map(worker_task, topics_and_node_ids)

pool.close()
pool.join()
pool.clear()

print("Done")

## Merge raw traces

In [None]:
merged_traces_file = "merged_traces.tsv"
with open(merged_traces_file, "w") as f:
    for trace_file in trace_files:
        if os.path.exists(trace_file):
            with open(trace_file, "r") as t:
                
                f.write(t.read())
        else:
            print(f"Warning: trace file {trace_file} not found. Skipping.")

In [None]:
for proc in subscribe_processes:
    proc.terminate()