Skip to content

Commit

Permalink
fix: ensure closing Executor at shutdown in HTTP Deployment (jina-ai#…
Browse files Browse the repository at this point in the history
  • Loading branch information
JoanFM committed Jun 7, 2023
1 parent 91bfa76 commit 30c0a94
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 5 deletions.
2 changes: 1 addition & 1 deletion jina/orchestrate/deployments/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ def __init__(
):
raise RuntimeError(
f'It is not supported to have {ProtocolType.WEBSOCKET.to_string()} deployment for '
f'Deployments with more than one shard'
f'Deployments'
)
is_mac_os = platform.system() == 'Darwin'
is_windows_os = platform.system() == 'Windows'
Expand Down
14 changes: 11 additions & 3 deletions jina/serve/runtimes/worker/request_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,18 @@ def _http_fastapi_default_app(self,
def call_handle(request):
return self.process_single_data(request, None)

return get_fastapi_app(
app = get_fastapi_app(
request_models_map=request_models_map,
caller=call_handle,
**kwargs
)

@app.on_event('shutdown')
async def _shutdown():
await self.close()

return app

async def _hot_reload(self):
import inspect

Expand Down Expand Up @@ -793,12 +799,14 @@ async def endpoint_discovery(self, empty, context) -> jina_pb2.EndpointsProto:
if inner_dict['input']['model'].schema() == legacy_doc_schema:
inner_dict['input']['model'] = legacy_doc_schema
else:
inner_dict['input']['model'] = _create_aux_model_doc_list_to_list(inner_dict['input']['model']).schema()
inner_dict['input']['model'] = _create_aux_model_doc_list_to_list(
inner_dict['input']['model']).schema()

if inner_dict['output']['model'].schema() == legacy_doc_schema:
inner_dict['output']['model'] = legacy_doc_schema
else:
inner_dict['output']['model'] = _create_aux_model_doc_list_to_list(inner_dict['output']['model']).schema()
inner_dict['output']['model'] = _create_aux_model_doc_list_to_list(
inner_dict['output']['model']).schema()
else:
for endpoint_name, inner_dict in schemas.items():
inner_dict['input']['model'] = inner_dict['input']['model'].schema()
Expand Down
23 changes: 22 additions & 1 deletion tests/integration/docarray_v2/test_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -990,7 +990,6 @@ def search(self, docs: DocList[TextDocWithId], **kwargs) -> DocList[ResultTestDo


def test_issue_shards_missmatch_endpoint():

class MyDoc(BaseDoc):
text: str
embedding: NdArray[128]
Expand All @@ -1014,3 +1013,25 @@ def foo(self, docs: DocList[MyDoc], **kwargs) -> DocList[MyDocWithMatchesAndScor
with d:
res = d.post(on='/', inputs=DocList[MyDoc]([MyDoc(text='hey ha', embedding=np.random.rand(128))]))
assert len(res) == 1


@pytest.mark.parametrize('protocol', ['grpc', 'http'])
def test_closing_executor(tmpdir, protocol):
class ClosingExec(Executor):

def __init__(self, file_path, *args, **kwargs):
super().__init__(*args, **kwargs)
self._file_path = file_path

def close(self) -> None:
with open(self._file_path, 'w') as f:
f.write('I closed')

file_path = f'{str(tmpdir)}/file.txt'
d = Deployment(uses=ClosingExec, uses_with={'file_path': file_path}, protocol=protocol)
with d:
pass

with open(file_path, 'r') as f:
r = f.read()
assert r == 'I closed'

0 comments on commit 30c0a94

Please sign in to comment.