Skip to content

Commit

Permalink
Merge pull request #21 from faustomilletari/delayedresponse
Browse files Browse the repository at this point in the history
Added support for DelayedResponses
  • Loading branch information
faustomilletari committed Jun 10, 2018
2 parents d8e22ac + e9df92a commit a107b33
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 11 deletions.
5 changes: 2 additions & 3 deletions tomaat/extras/niftynet.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@
import nibabel as nib
import SimpleITK as sitk
import numpy as np
import time

from tomaat.server import TomaatService
from tomaat.server import TomaatServiceDelayedResponse


# this extra works with model zoo from NiftyNet and in particular with highresnet3d_brain_parcellation model.
Expand Down Expand Up @@ -171,7 +170,7 @@ def start_service(config_file_path, ini_file_path):

application = NiftyNetZooApp(ini_file_path)

service = TomaatService(
service = TomaatServiceDelayedResponse(
config=config,
app=application,
input_interface=input_interface,
Expand Down
140 changes: 132 additions & 8 deletions tomaat/server/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
# Fall back to Python 2's urllib2
from urllib2 import urlopen

from multiprocessing import Process, Manager, Lock
from urllib2 import urlopen

from klein import Klein
from twisted.internet.defer import inlineCallbacks, returnValue, DeferredLock
from twisted.internet import threads
Expand Down Expand Up @@ -164,8 +167,8 @@ def make_error_response(self, message):
:type message: str error message to be returned to the client
:return: response to be returned to client
"""
response = {'type': 'PlainText', 'content': message, 'label': 'Error!'}
return json.dumps(response)
response = [{'type': 'PlainText', 'content': message, 'label': 'Error!'}]
return response

def parse_request(self, request, savepath):
"""
Expand Down Expand Up @@ -273,7 +276,7 @@ def make_response(self, data, savepath):
fiducial_str = ';'.join([','.join(map(str,fid_point)) for fid_point in fiducial_array])
message.append({'type': 'Fiducials', 'content': fiducial_str, 'label': ''})

return json.dumps(message)
return message

def received_data_handler(self, request):
savepath = os.path.join(tempfile.gettempdir(), str(uuid.uuid4()).replace('-', ''))
Expand All @@ -285,28 +288,149 @@ def received_data_handler(self, request):
except:
traceback.print_exc()
logger.error('Server-side ERROR during request parsing')
return self.make_error_response('Server-side ERROR during request parsing')
response = self.make_error_response('Server-side ERROR during request parsing')
return json.dumps(response)

try:
transformed_result = self.app(data, gpu_lock=self.gpu_lock)
except:
traceback.print_exc()
logger.error('Server-side ERROR during processing')
return self.make_error_response('Server-side ERROR during processing')
response = self.make_error_response('Server-side ERROR during processing')
return json.dumps(response)

try:
response = self.make_response(transformed_result, savepath)
except:
traceback.print_exc()
logger.error('Server-side ERROR during response message creation')
return self.make_error_response('Server-side ERROR during response message creation')
response = self.make_error_response('Server-side ERROR during response message creation')
return json.dumps(response)

shutil.rmtree(savepath)

return response
return json.dumps(response)

def run(self):
self.klein_app.run(port=self.config['port'], host='0.0.0.0')
reactor.run()


class TomaatServiceDelayedResponse(TomaatService):
announcement_task = None

gpu_lock = DeferredLock()

multiprocess_manager = Manager()

result_dict = multiprocess_manager.dict()
reqest_list = multiprocess_manager.list()

multiprocess_lock = Lock()

klein_app = Klein()

def __init__(self, no_concurrent_thread_execution=True, **kwargs):
super(TomaatServiceDelayedResponse, self).__init__(**kwargs)
self.no_concurrent_thread_execution = no_concurrent_thread_execution

def received_data_handler(self, request):
req_id = str(uuid.uuid4()).replace('-', '')

savepath = os.path.join(tempfile.gettempdir(), req_id)

os.mkdir(savepath)

def processing_thread():
if self.no_concurrent_thread_execution:
self.multiprocess_lock.acquire()

response = self.make_error_response('Server-side ERROR during processing')

try:
data = self.parse_request(request, savepath)
except:
traceback.print_exc()
logger.error('Server-side ERROR during request parsing')

try:
transformed_result = self.app(data, gpu_lock=self.gpu_lock)
except:
traceback.print_exc()
logger.error('Server-side ERROR during processing')

try:
response = self.make_response(transformed_result, savepath)
except:
traceback.print_exc()
logger.error('Server-side ERROR during response message creation')

response = [{
'type': 'PlainText',
'content': 'The results of your earlier request {} have been received'.format(req_id),
'label': ''
}] + response

self.result_dict[req_id] = response

shutil.rmtree(savepath)

if self.no_concurrent_thread_execution:
self.multiprocess_lock.release()

delegated_process = Process(target=processing_thread, args=())
delegated_process.start()

self.reqest_list.append(req_id)

response = [{'type': 'DelayedResponse', 'request_id': req_id}]

return json.dumps(response)

@klein_app.route('/interface', methods=['GET'])
def interface(self, _):
return json.dumps(self.input_interface)

@klein_app.route('/predict', methods=['POST'])
@inlineCallbacks
def predict(self, request):
logger.info('predicting...')

result = yield threads.deferToThread(self.received_data_handler, request)

returnValue(result)

@klein_app.route('/responses', methods=['POST'])
@inlineCallbacks
def responses(self, request):
logger.info('getting responses...')

result = yield threads.deferToThread(self.responses_data_handler, request)

returnValue(result)

def responses_data_handler(self, request):
req_id = request.args['request_id'][0]

print(req_id)
print(self.reqest_list)

if req_id not in self.reqest_list:
response = [{
'type': 'PlainText',
'content': 'The results of request {} cannot be retrieved'.format(req_id),
'label': ''
}]

return json.dumps(response)

try:
response = self.result_dict[req_id]
#removing content of list and dict
del self.result_dict[req_id]
self.reqest_list.remove(req_id)
except KeyError:
response = [{'type': 'DelayedResponse', 'request_id': req_id}]


return json.dumps(response)

0 comments on commit a107b33

Please sign in to comment.