Permalink
Browse files

ENH: rview and report system introduced

  • Loading branch information...
spirali committed Oct 13, 2016
1 parent 72ff170 commit a18f93e296bcf2df96e8b20faaa08d1c6d7b8523
@@ -12,6 +12,7 @@ _build
*.so.*
*_pch.h.cpp
*_resource.rc
.*

# qtcreator generated files
*.pro.user*
@@ -1,4 +1,5 @@
from loomcomm_pb2 import Register, Data, ClientMessage
from loomcomm_pb2 import Register, Data, ClientMessage, ClientSubmit
from loomreport_pb2 import Report

import socket
from connection import Connection
@@ -23,7 +24,7 @@ def __init__(self, id, worker, error_msg):

class Client(object):

def __init__(self, address, port, info=False):
def __init__(self, address, port):
self.server_address = address
self.server_port = port

@@ -36,32 +37,27 @@ def __init__(self, address, port, info=False):
s.connect((address, port))
self.connection = Connection(s)

if info:
self.info = []
else:
self.info = None

msg = Register()
msg.type = Register.REGISTER_CLIENT
msg.protocol_version = LOOM_PROTOCOL_VERSION
msg.info = info
self._send_message(msg)

while self.symbols is None:
self._read_symbols()

def submit(self, plan, results):

msg = plan.create_message(self.symbols)
def submit(self, plan, results, report=None):
msg = ClientSubmit()
msg.report = bool(report)
plan.set_message(msg.plan, self.symbols)

if isinstance(results, Task):
single_result = True
msg.result_ids.extend((results.id,))
msg.plan.result_ids.extend((results.id,))
expected = 1
else:
single_result = False
r = set(results)
msg.result_ids.extend(r.id for r in r)
msg.plan.result_ids.extend(r.id for r in r)
expected = len(r)

self._send_message(msg)
@@ -74,18 +70,35 @@ def submit(self, plan, results):
if cmsg.type == ClientMessage.DATA:
prologue = cmsg.data
data[prologue.id] = self._receive_data()
elif cmsg.type == ClientMessage.INFO:
self.add_info(cmsg.info)
elif cmsg.type == ClientMessage.EVENT:
self.process_event(cmsg.event)
elif cmsg.type == ClientMessage.ERROR:
self.process_error(cmsg)
else:
assert 0

if report:
self._write_report(report, plan)

if single_result:
return data[results.id]
else:
return [data[task.id] for task in results]

def _symbol_list(self):
symbols = [None] * len(self.symbols)
for name, index in self.symbols.items():
symbols[index] = name
return symbols

def _write_report(self, report_filename, plan):
report_msg = Report()
report_msg.symbols.extend(self._symbol_list())
plan.set_message(report_msg.plan, self.symbols)

with open(report_filename + ".report", "w") as f:
f.write(report_msg.SerializeToString())

def _read_symbols(self):
msg = self.connection.receive_message()
cmsg = ClientMessage()
@@ -102,8 +115,8 @@ def process_error(self, cmsg):
error = cmsg.error
raise TaskFailed(error.id, error.worker, error.error_msg)

def add_info(self, info):
self.info.append((info.id, info.worker))
def process_event(self, event):
assert 0

def _receive_data(self):
msg_data = Data()
Oops, something went wrong.

0 comments on commit a18f93e

Please sign in to comment.