From 594e73e9ea427fcb27a5f375cb7738737149ac48 Mon Sep 17 00:00:00 2001 From: Vojtech Cima Date: Fri, 28 Jul 2017 14:01:14 +0200 Subject: [PATCH] ENH: Added peer id for network traffic --- python/loom/lore/report.py | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/python/loom/lore/report.py b/python/loom/lore/report.py index 4d4c616..47dbb3f 100644 --- a/python/loom/lore/report.py +++ b/python/loom/lore/report.py @@ -73,7 +73,8 @@ class Report: def __init__(self, trace_path): self.trace_path = trace_path - self.workers = {} + self.workers = {} # by ID + self.workers_by_addr = {} # by address self.symbols = [] self.id_bases = [] self.scheduler_times = None @@ -145,7 +146,9 @@ def parse_server_file(self, tasks): end_time.append(time) elif command == "WORKER": worker_id = to_int(line[1]) - self.workers[worker_id] = Worker(worker_id, line[2]) + worker = Worker(worker_id, line[2]) + self.workers[worker_id] = worker + self.workers_by_addr[line[2]] = worker elif command == "SYMBOL": self.symbols.append(line[1]) elif command == "SUBMIT": @@ -175,7 +178,6 @@ def parse_worker_file(self, worker_id, tasks): monitoring_times = [] monitoring_cpu = [] monitoring_mem = [] - time = 0 for line in it: command = line[0] @@ -193,13 +195,24 @@ def parse_worker_file(self, worker_id, tasks): monitoring_cpu.append(to_int(line[1])) monitoring_mem.append(to_int(line[2])) elif command == "D": - sends.append((time, to_int(line[1]), to_int(line[2]))) + w = self.workers_by_addr.get(line[3]) + if w is not None: + w = w.worker_id + else: + w = -1 + sends.append((time, to_int(line[1]), to_int(line[2]), w)) elif command == "R": - recvs.append((time, to_int(line[1]), to_int(line[2]))) + w = self.workers_by_addr.get(line[3]) + if w is not None: + w = w.worker_id + else: + w = -1 + print("WARNING! Received data from dummy worker.") + recvs.append((time, to_int(line[1]), to_int(line[2]), w)) else: raise Exception("Unknown line: {}".format(line)) - columns = ("time", "id", "data_size") + columns = ("time", "id", "data_size", "peer_id") worker.sends = pd.DataFrame(sends, columns=columns) worker.recvs = pd.DataFrame(recvs, columns=columns) worker.monitoring = pd.DataFrame({"time": monitoring_times,