Skip to content
This repository has been archived by the owner on Feb 22, 2020. It is now read-only.

Commit

Permalink
Merge pull request #226 from gnes-ai/refactor-grpc-client
Browse files Browse the repository at this point in the history
feat(grpc-client): add multi-threaded sync grpc client
  • Loading branch information
mergify[bot] committed Sep 6, 2019
2 parents 8021d18 + ad65d29 commit 22efe72
Show file tree
Hide file tree
Showing 11 changed files with 260 additions and 107 deletions.
14 changes: 8 additions & 6 deletions Dockerfiles/alpine.Dockerfile
Expand Up @@ -9,16 +9,18 @@ LABEL maintainer="team@gnes.ai" \
org.label-schema.build-date=$BUILD_DATE \
org.label-schema.name="GNES is Generic Nerual Elastic Search"

WORKDIR /gnes/

ADD . ./

RUN apk add --no-cache \
--virtual=.build-dependencies \
build-base g++ gfortran file binutils zeromq-dev \
musl-dev python3-dev py-pgen cython openblas-dev && \
apk add --no-cache libstdc++ openblas libzmq && \
ln -s locale.h /usr/include/xlocale.h && \
apk add --no-cache libstdc++ openblas libzmq


WORKDIR /gnes/

ADD . ./

RUN ln -s locale.h /usr/include/xlocale.h && \
pip install . --no-cache-dir --compile && \
find /usr/lib/python3.7/ -name 'tests' -exec rm -r '{}' + && \
find /usr/lib/python3.7/site-packages/ -name '*.so' -print -exec sh -c 'file "{}" | grep -q "not stripped" && strip -s "{}"' \; && \
Expand Down
11 changes: 6 additions & 5 deletions Dockerfiles/buster.Dockerfile
Expand Up @@ -9,17 +9,18 @@ LABEL maintainer="team@gnes.ai" \
org.label-schema.build-date=$BUILD_DATE \
org.label-schema.name="GNES is Generic Nerual Elastic Search"

RUN apt-get update && apt-get install --no-install-recommends -y \
build-essential \
python3-dev libopenblas-dev && \
apt-get autoremove && apt-get clean && rm -rf /var/lib/apt/lists/*

WORKDIR /gnes/

ADD . ./

RUN apt-get update && apt-get install --no-install-recommends -y \
build-essential \
python3-dev libopenblas-dev && \
ln -s locale.h /usr/include/xlocale.h && \
RUN ln -s locale.h /usr/include/xlocale.h && \
pip install . --no-cache-dir --compile && \
rm -rf /tmp/* && rm -rf /gnes && \
apt-get autoremove && apt-get clean && rm -rf /var/lib/apt/lists/* && \
rm /usr/include/xlocale.h

WORKDIR /
Expand Down
9 changes: 5 additions & 4 deletions Dockerfiles/ubuntu18.Dockerfile
Expand Up @@ -18,16 +18,17 @@ ENV LANG=en_US.UTF-8 \
LANGUAGE=en_US:en \
LC_ALL=en_US.UTF-8

ADD . ./

RUN apt-get update && apt-get install --no-install-recommends -y \
build-essential python3-dev python3-pip python3-setuptools libopenblas-dev && \
export LC_ALL=en_US.UTF-8 && export LANG=en_US.UTF-8 && export LC_CTYPE=en_US.UTF-8 && \
ln -s /usr/bin/python3 python && \
pip3 install . --no-cache-dir --compile && \
rm -rf /tmp/* && rm -rf /gnes && \
apt-get autoremove && apt-get clean && rm -rf /var/lib/apt/lists/*

ADD . ./

RUN pip3 install . --no-cache-dir --compile && \
rm -rf /tmp/* && rm -rf /gnes

WORKDIR /

ENTRYPOINT ["gnes"]
1 change: 1 addition & 0 deletions gnes/__init__.py
Expand Up @@ -17,3 +17,4 @@
# do not change this line
# this is managed by git tag and replaced on every release
__version__ = '0.0.36'
__proto_version__ = '20190905'
33 changes: 12 additions & 21 deletions gnes/client/cli.py
Expand Up @@ -20,44 +20,35 @@
from math import ceil
from typing import List

import grpc
from termcolor import colored

from ..proto import gnes_pb2_grpc, RequestGenerator
from ..proto import RequestGenerator
from .grpc import StreamingSyncClient


class CLIClient:
class CLIClient(StreamingSyncClient):
def __init__(self, args):
self.args = args
self._use_channel()

def _use_channel(self):
all_bytes = self.read_all()
with grpc.insecure_channel(
'%s:%d' % (self.args.grpc_host, self.args.grpc_port),
options=[('grpc.max_send_message_length', self.args.max_message_size * 1024 * 1024),
('grpc.max_receive_message_length', self.args.max_message_size * 1024 * 1024)]) as channel:
stub = gnes_pb2_grpc.GnesRPCStub(channel)
getattr(self, self.args.mode)(all_bytes, stub)
super().__init__(args)
self.start()
getattr(self, self.args.mode)(all_bytes)
self.stop()

def train(self, all_bytes: List[bytes], stub):
with ProgressBar(all_bytes, self.args.batch_size, task_name=self.args.mode) as p_bar:
for _ in stub.StreamCall(RequestGenerator.train(all_bytes,
doc_id_start=self.args.start_doc_id,
batch_size=self.args.batch_size)):
for req in RequestGenerator.train(all_bytes, doc_id_start=self.args.start_doc_id, batch_size=self.args.batch_size):
self.send_request(req)
p_bar.update()

def index(self, all_bytes: List[bytes], stub):
with ProgressBar(all_bytes, self.args.batch_size, task_name=self.args.mode) as p_bar:
for _ in stub.StreamCall(RequestGenerator.index(all_bytes,
doc_id_start=self.args.start_doc_id,
batch_size=self.args.batch_size)):
for req in RequestGenerator.index(all_bytes, doc_id_start=self.args.start_doc_id, batch_size=self.args.batch_size):
self.send_request(req)
p_bar.update()

def query(self, all_bytes: List[bytes], stub):
for idx, q in enumerate(all_bytes):
for req in RequestGenerator.query(q, request_id_start=idx, top_k=self.args.top_k):
resp = stub.Call(req)
resp = self._stub.Call(req)
print(resp)
print('query %d result: %s' % (idx, resp))
input('press any key to continue...')
Expand Down
151 changes: 151 additions & 0 deletions gnes/client/grpc.py
@@ -0,0 +1,151 @@
# Tencent is pleased to support the open source community by making GNES available.
#
# Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import grpc
import queue
from concurrent import futures

from gnes.proto import gnes_pb2_grpc


class BaseClient:

def __init__(self, args):
self.args = args

self._channel = grpc.insecure_channel(
'%s:%d' % (self.args.grpc_host, self.args.grpc_port),
options={
"grpc.max_send_message_length": -1,
"grpc.max_receive_message_length": -1,
}.items(),
)

def call(self, request):
resp = self._stub.Call(request)
return resp

def async_call(self, request, callback_fn=None):
response_future = self._stub.Call.future(self._request)
if callback_fn:
response_future.add_done_callback(callback_fn)
else:
return response_future

def send_request(self, request):
"""Non-blocking wrapper for a client's request operation."""
raise NotImplementedError

def start(self):
# waits for the channel to be ready before we start sending messages
grpc.channel_ready_future(self._channel).result()
self._stub = gnes_pb2_grpc.GnesRPCStub(self._channel)

def stop(self):
self._channel.close()
self._stub = None

def __enter__(self):
self.start()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()


class UnarySyncClient(BaseClient):

def __init__(self, args):
super().__init__(args)
self._pool = futures.ThreadPoolExecutor(
max_workers=self.args.max_concurrency)
self.request_cnt = 0
self._response_callbacks = []

def send_request(self, request):
# Send requests in seperate threads to support multiple outstanding rpcs
self._pool.submit(self._dispatch_request, request)

def stop(self):
self._pool.shutdown(wait=True)
super().stop()

def _dispatch_request(self, request):
resp = self._stub.Call(request)
self._handle_response(self, resp)

def _handle_response(self, client, response):
for callback in self._response_callbacks:
callback(client, response)

def add_response_callback(self, callback):
"""callback will be invoked as callback(client, response)"""
self._response_callbacks.append(callback)


class _SyncStream(object):

def __init__(self, stub, handle_response):
self._stub = stub
self._handle_response = handle_response
self._is_streaming = False
self._request_queue = queue.Queue()

def send_request(self, request):
self._request_queue.put(request)

def start(self):
self._is_streaming = True
response_stream = self._stub.StreamCall(self._request_generator())
for resp in response_stream:
self._handle_response(self, resp)

def stop(self):
self._is_streaming = False

def _request_generator(self):
while self._is_streaming:
try:
request = self._request_queue.get(block=True, timeout=1.0)
yield request
except queue.Empty:
pass


class StreamingClient(UnarySyncClient):

def __init__(self, args):
super().__init__(args)

self._streams = [
_SyncStream(self._stub, self._handle_response)
for _ in range(self.args.max_concurrency)
]
self._curr_stream = 0

def send_request(self, request):
# Use a round_robin scheduler to determine what stream to send on
self._streams[self._curr_stream].send_request(request)
self._curr_stream = (self._curr_stream + 1) % len(self._streams)

def start(self):
super().start()
for stream in self._streams:
self._pool.submit(stream.start)

def stop(self):
for stream in self._streams:
stream.stop()
super().stop()
6 changes: 3 additions & 3 deletions gnes/preprocessor/io_utils/gif.py
Expand Up @@ -60,15 +60,15 @@ def capture_frames(input_fn: str = 'pipe:',
return frames


def encode_gif(images: 'np.ndarray', fps: int, pix_fmt: str = 'rgb24'):
def encode_video(images: 'np.ndarray', frame_rate: int, pix_fmt: str = 'rgb24'):

cmd = [
'ffmpeg', '-y', '-f', 'rawvideo', '-vcodec', 'rawvideo', '-r',
'%.02f' % fps, '-s',
'%.02f' % frame_rate, '-s',
'%dx%d' % (images[0].shape[1], images[0].shape[0]), '-pix_fmt',
'rgb24', '-i', '-', '-filter_complex',
'[0:v]split[x][z];[z]palettegen[y];[x]fifo[x];[x][y]paletteuse', '-r',
'%.02f' % fps, '-f', 'gif', '-'
'%.02f' % frame_rate, '-f', 'gif', '-'
]
proc = sp.Popen(cmd, stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.PIPE)
for image in images:
Expand Down

0 comments on commit 22efe72

Please sign in to comment.