diff --git a/Dockerfiles/alpine.Dockerfile b/Dockerfiles/alpine.Dockerfile index b4998d4d..b55f9c3f 100644 --- a/Dockerfiles/alpine.Dockerfile +++ b/Dockerfiles/alpine.Dockerfile @@ -9,16 +9,18 @@ LABEL maintainer="team@gnes.ai" \ org.label-schema.build-date=$BUILD_DATE \ org.label-schema.name="GNES is Generic Nerual Elastic Search" -WORKDIR /gnes/ - -ADD . ./ - RUN apk add --no-cache \ --virtual=.build-dependencies \ build-base g++ gfortran file binutils zeromq-dev \ musl-dev python3-dev py-pgen cython openblas-dev && \ - apk add --no-cache libstdc++ openblas libzmq && \ - ln -s locale.h /usr/include/xlocale.h && \ + apk add --no-cache libstdc++ openblas libzmq + + +WORKDIR /gnes/ + +ADD . ./ + +RUN ln -s locale.h /usr/include/xlocale.h && \ pip install . --no-cache-dir --compile && \ find /usr/lib/python3.7/ -name 'tests' -exec rm -r '{}' + && \ find /usr/lib/python3.7/site-packages/ -name '*.so' -print -exec sh -c 'file "{}" | grep -q "not stripped" && strip -s "{}"' \; && \ diff --git a/Dockerfiles/buster.Dockerfile b/Dockerfiles/buster.Dockerfile index d66fab7c..42333aa0 100644 --- a/Dockerfiles/buster.Dockerfile +++ b/Dockerfiles/buster.Dockerfile @@ -9,17 +9,18 @@ LABEL maintainer="team@gnes.ai" \ org.label-schema.build-date=$BUILD_DATE \ org.label-schema.name="GNES is Generic Nerual Elastic Search" +RUN apt-get update && apt-get install --no-install-recommends -y \ + build-essential \ + python3-dev libopenblas-dev && \ + apt-get autoremove && apt-get clean && rm -rf /var/lib/apt/lists/* + WORKDIR /gnes/ ADD . ./ -RUN apt-get update && apt-get install --no-install-recommends -y \ - build-essential \ - python3-dev libopenblas-dev && \ - ln -s locale.h /usr/include/xlocale.h && \ +RUN ln -s locale.h /usr/include/xlocale.h && \ pip install . --no-cache-dir --compile && \ rm -rf /tmp/* && rm -rf /gnes && \ - apt-get autoremove && apt-get clean && rm -rf /var/lib/apt/lists/* && \ rm /usr/include/xlocale.h WORKDIR / diff --git a/Dockerfiles/ubuntu18.Dockerfile b/Dockerfiles/ubuntu18.Dockerfile index ab58a1dc..1fb17ef0 100644 --- a/Dockerfiles/ubuntu18.Dockerfile +++ b/Dockerfiles/ubuntu18.Dockerfile @@ -18,16 +18,17 @@ ENV LANG=en_US.UTF-8 \ LANGUAGE=en_US:en \ LC_ALL=en_US.UTF-8 -ADD . ./ - RUN apt-get update && apt-get install --no-install-recommends -y \ build-essential python3-dev python3-pip python3-setuptools libopenblas-dev && \ export LC_ALL=en_US.UTF-8 && export LANG=en_US.UTF-8 && export LC_CTYPE=en_US.UTF-8 && \ ln -s /usr/bin/python3 python && \ - pip3 install . --no-cache-dir --compile && \ - rm -rf /tmp/* && rm -rf /gnes && \ apt-get autoremove && apt-get clean && rm -rf /var/lib/apt/lists/* +ADD . ./ + +RUN pip3 install . --no-cache-dir --compile && \ + rm -rf /tmp/* && rm -rf /gnes + WORKDIR / ENTRYPOINT ["gnes"] \ No newline at end of file diff --git a/gnes/__init__.py b/gnes/__init__.py index c334e2b7..9b6b9636 100644 --- a/gnes/__init__.py +++ b/gnes/__init__.py @@ -17,3 +17,4 @@ # do not change this line # this is managed by git tag and replaced on every release __version__ = '0.0.36' +__proto_version__ = '20190905' diff --git a/gnes/client/cli.py b/gnes/client/cli.py index 7cf67bf5..d91f9fe7 100644 --- a/gnes/client/cli.py +++ b/gnes/client/cli.py @@ -20,44 +20,35 @@ from math import ceil from typing import List -import grpc from termcolor import colored -from ..proto import gnes_pb2_grpc, RequestGenerator +from ..proto import RequestGenerator +from .grpc import StreamingSyncClient -class CLIClient: +class CLIClient(StreamingSyncClient): def __init__(self, args): - self.args = args - self._use_channel() - - def _use_channel(self): - all_bytes = self.read_all() - with grpc.insecure_channel( - '%s:%d' % (self.args.grpc_host, self.args.grpc_port), - options=[('grpc.max_send_message_length', self.args.max_message_size * 1024 * 1024), - ('grpc.max_receive_message_length', self.args.max_message_size * 1024 * 1024)]) as channel: - stub = gnes_pb2_grpc.GnesRPCStub(channel) - getattr(self, self.args.mode)(all_bytes, stub) + super().__init__(args) + self.start() + getattr(self, self.args.mode)(all_bytes) + self.stop() def train(self, all_bytes: List[bytes], stub): with ProgressBar(all_bytes, self.args.batch_size, task_name=self.args.mode) as p_bar: - for _ in stub.StreamCall(RequestGenerator.train(all_bytes, - doc_id_start=self.args.start_doc_id, - batch_size=self.args.batch_size)): + for req in RequestGenerator.train(all_bytes, doc_id_start=self.args.start_doc_id, batch_size=self.args.batch_size): + self.send_request(req) p_bar.update() def index(self, all_bytes: List[bytes], stub): with ProgressBar(all_bytes, self.args.batch_size, task_name=self.args.mode) as p_bar: - for _ in stub.StreamCall(RequestGenerator.index(all_bytes, - doc_id_start=self.args.start_doc_id, - batch_size=self.args.batch_size)): + for req in RequestGenerator.index(all_bytes, doc_id_start=self.args.start_doc_id, batch_size=self.args.batch_size): + self.send_request(req) p_bar.update() def query(self, all_bytes: List[bytes], stub): for idx, q in enumerate(all_bytes): for req in RequestGenerator.query(q, request_id_start=idx, top_k=self.args.top_k): - resp = stub.Call(req) + resp = self._stub.Call(req) print(resp) print('query %d result: %s' % (idx, resp)) input('press any key to continue...') diff --git a/gnes/client/grpc.py b/gnes/client/grpc.py new file mode 100644 index 00000000..7d19a8fd --- /dev/null +++ b/gnes/client/grpc.py @@ -0,0 +1,151 @@ +# Tencent is pleased to support the open source community by making GNES available. +# +# Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc +import queue +from concurrent import futures + +from gnes.proto import gnes_pb2_grpc + + +class BaseClient: + + def __init__(self, args): + self.args = args + + self._channel = grpc.insecure_channel( + '%s:%d' % (self.args.grpc_host, self.args.grpc_port), + options={ + "grpc.max_send_message_length": -1, + "grpc.max_receive_message_length": -1, + }.items(), + ) + + def call(self, request): + resp = self._stub.Call(request) + return resp + + def async_call(self, request, callback_fn=None): + response_future = self._stub.Call.future(self._request) + if callback_fn: + response_future.add_done_callback(callback_fn) + else: + return response_future + + def send_request(self, request): + """Non-blocking wrapper for a client's request operation.""" + raise NotImplementedError + + def start(self): + # waits for the channel to be ready before we start sending messages + grpc.channel_ready_future(self._channel).result() + self._stub = gnes_pb2_grpc.GnesRPCStub(self._channel) + + def stop(self): + self._channel.close() + self._stub = None + + def __enter__(self): + self.start() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + +class UnarySyncClient(BaseClient): + + def __init__(self, args): + super().__init__(args) + self._pool = futures.ThreadPoolExecutor( + max_workers=self.args.max_concurrency) + self.request_cnt = 0 + self._response_callbacks = [] + + def send_request(self, request): + # Send requests in seperate threads to support multiple outstanding rpcs + self._pool.submit(self._dispatch_request, request) + + def stop(self): + self._pool.shutdown(wait=True) + super().stop() + + def _dispatch_request(self, request): + resp = self._stub.Call(request) + self._handle_response(self, resp) + + def _handle_response(self, client, response): + for callback in self._response_callbacks: + callback(client, response) + + def add_response_callback(self, callback): + """callback will be invoked as callback(client, response)""" + self._response_callbacks.append(callback) + + +class _SyncStream(object): + + def __init__(self, stub, handle_response): + self._stub = stub + self._handle_response = handle_response + self._is_streaming = False + self._request_queue = queue.Queue() + + def send_request(self, request): + self._request_queue.put(request) + + def start(self): + self._is_streaming = True + response_stream = self._stub.StreamCall(self._request_generator()) + for resp in response_stream: + self._handle_response(self, resp) + + def stop(self): + self._is_streaming = False + + def _request_generator(self): + while self._is_streaming: + try: + request = self._request_queue.get(block=True, timeout=1.0) + yield request + except queue.Empty: + pass + + +class StreamingClient(UnarySyncClient): + + def __init__(self, args): + super().__init__(args) + + self._streams = [ + _SyncStream(self._stub, self._handle_response) + for _ in range(self.args.max_concurrency) + ] + self._curr_stream = 0 + + def send_request(self, request): + # Use a round_robin scheduler to determine what stream to send on + self._streams[self._curr_stream].send_request(request) + self._curr_stream = (self._curr_stream + 1) % len(self._streams) + + def start(self): + super().start() + for stream in self._streams: + self._pool.submit(stream.start) + + def stop(self): + for stream in self._streams: + stream.stop() + super().stop() \ No newline at end of file diff --git a/gnes/preprocessor/io_utils/gif.py b/gnes/preprocessor/io_utils/gif.py index 52f61429..92e76bb2 100644 --- a/gnes/preprocessor/io_utils/gif.py +++ b/gnes/preprocessor/io_utils/gif.py @@ -60,15 +60,15 @@ def capture_frames(input_fn: str = 'pipe:', return frames -def encode_gif(images: 'np.ndarray', fps: int, pix_fmt: str = 'rgb24'): +def encode_video(images: 'np.ndarray', frame_rate: int, pix_fmt: str = 'rgb24'): cmd = [ 'ffmpeg', '-y', '-f', 'rawvideo', '-vcodec', 'rawvideo', '-r', - '%.02f' % fps, '-s', + '%.02f' % frame_rate, '-s', '%dx%d' % (images[0].shape[1], images[0].shape[0]), '-pix_fmt', 'rgb24', '-i', '-', '-filter_complex', '[0:v]split[x][z];[z]palettegen[y];[x]fifo[x];[x][y]paletteuse', '-r', - '%.02f' % fps, '-f', 'gif', '-' + '%.02f' % frame_rate, '-f', 'gif', '-' ] proc = sp.Popen(cmd, stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.PIPE) for image in images: diff --git a/gnes/preprocessor/io_utils/video.py b/gnes/preprocessor/io_utils/video.py index 5365d9e8..d83264db 100644 --- a/gnes/preprocessor/io_utils/video.py +++ b/gnes/preprocessor/io_utils/video.py @@ -18,7 +18,7 @@ from typing import List -from .ffmpeg import get_media_meta, compile_args +from .ffmpeg import compile_args, probe from .helper import _check_input, run_command, run_command_async @@ -53,7 +53,7 @@ def scale_video(input_fn: str = 'pipe:', 'crf': crf, 'framerate': frame_rate, 'acodec': 'aac', - 'strict': 'experimental', # AAC audio encoder is experimental + 'strict': 'experimental', # AAC audio encoder is experimental } if scale: @@ -142,70 +142,78 @@ def capture_frames(input_fn: str = 'pipe:', **kwargs) -> List['np.ndarray']: _check_input(input_fn, input_data) - video_meta = get_media_meta(input_fn=input_fn, input_data=input_data) - width = video_meta['frame_width'] - height = video_meta['frame_height'] - - if scale is not None: - _width, _height = map(int, scale.split(':')) - if _width * _height < 0: - if _width > 0: - ratio = _width / width - height = int(ratio * height) - if _height == -2: - height += height % 2 - width = _width + import tempfile + + with tempfile.NamedTemporaryFile() as f: + if input_data: + f.write(input_data) + f.flush() + input_fn = f.name + + video_meta = probe(input_fn) + width = video_meta['width'] + height = video_meta['height'] + + if scale is not None: + _width, _height = map(int, scale.split(':')) + if _width * _height < 0: + if _width > 0: + ratio = _width / width + height = int(ratio * height) + if _height == -2: + height += height % 2 + width = _width + else: + ratio = _height / height + width = int(ratio * width) + if _width == -2: + width += width % 2 + + height = _height + + scale = '%d:%d' % (width, height) else: - ratio = _height / height - width = int(ratio * width) - if _width == -2: - width += width % 2 - + width = _width height = _height - scale = '%d:%d' % (width, height) + input_kwargs = { + 'err_detect': 'aggressive', + 'fflags': 'discardcorrupt' # discard corrupted frames + } + if start_time is not None: + input_kwargs['ss'] = str(start_time) else: - width = _width - height = _height - - input_kwargs = { - 'err_detect': 'aggressive', - 'fflags': 'discardcorrupt' # discard corrupted frames - } - if start_time is not None: - input_kwargs['ss'] = str(start_time) - else: - start_time = 0. - if end_time is not None: - input_kwargs['t'] = str(end_time - start_time) - - video_filters = [] - if fps: - video_filters += ['fps=%d' % fps] - if scale: - video_filters += ['scale=%s' % scale] - - output_kwargs = { - 'format': 'image2pipe', - 'pix_fmt': pix_fmt, - 'vcodec': 'rawvideo' - } - - cmd_args = compile_args( - input_fn=input_fn, - input_options=input_kwargs, - video_filters=video_filters, - output_options=output_kwargs) - - out, _ = run_command( - cmd_args, input=input_data, pipe_stdout=True, pipe_stderr=True) - - depth = 3 - if pix_fmt == 'rgba': - depth = 4 - - frames = np.frombuffer(out, np.uint8).reshape([-1, height, width, depth]) - return frames + start_time = 0. + if end_time is not None: + input_kwargs['t'] = str(end_time - start_time) + + video_filters = [] + if fps: + video_filters += ['fps=%d' % fps] + if scale: + video_filters += ['scale=%s' % scale] + + output_kwargs = { + 'format': 'image2pipe', + 'pix_fmt': pix_fmt, + 'vcodec': 'rawvideo', + 'movflags': 'faststart', + } + + cmd_args = compile_args( + input_fn=input_fn, + input_options=input_kwargs, + video_filters=video_filters, + output_options=output_kwargs) + out, _ = run_command(cmd_args, pipe_stdout=True, pipe_stderr=True) + + depth = 3 + if pix_fmt == 'rgba': + depth = 4 + + frames = np.frombuffer(out, + np.uint8).reshape([-1, height, width, depth]) + return frames # def read_frame_as_jpg(in_filename, frame_num): diff --git a/gnes/preprocessor/video/ffmpeg.py b/gnes/preprocessor/video/ffmpeg.py index b067c411..67b5d664 100644 --- a/gnes/preprocessor/video/ffmpeg.py +++ b/gnes/preprocessor/video/ffmpeg.py @@ -188,4 +188,4 @@ class GifChunkPreprocessor(RawChunkPreprocessor, BaseVideoPreprocessor): def _parse_chunk(chunk: 'gnes_pb2.Chunk', *args, **kwargs): from ..io_utils import gif as gif_util - return gif_util.encode_gif(blob2array(chunk.blob), fps=10) + return gif_util.encode_video(blob2array(chunk.blob), frame_rate=10) diff --git a/gnes/service/frontend.py b/gnes/service/frontend.py index 60edd8d4..93a30058 100644 --- a/gnes/service/frontend.py +++ b/gnes/service/frontend.py @@ -93,13 +93,11 @@ def Search(self, request, context): return self.Call(request, context) def StreamCall(self, request_iterator, context): - num_result = 0 with self.zmq_context as zmq_client: for request in request_iterator: zmq_client.send_message(self.add_envelope(request, zmq_client), self.args.timeout) - num_result += 1 - for _ in range(num_result): - yield self.remove_envelope(zmq_client.recv_message(self.args.timeout)) + msg = zmq_client.recv_message(self.args.timeout) + yield self.remove_envelope(msg) class ZmqContext: """The zmq context class.""" diff --git a/tests/test_ffmpeg_tools.py b/tests/test_ffmpeg_tools.py index 1d7e55f3..2845119d 100644 --- a/tests/test_ffmpeg_tools.py +++ b/tests/test_ffmpeg_tools.py @@ -52,7 +52,7 @@ def test_encode_video(self): self.assertEqual(meta['frame_height'], 360) def test_gif_encode(self): - gif_data = gif.encode_gif(images=self.frames, fps=10) + gif_data = gif.encode_video(images=self.frames, frame_rate=10) frames = gif.capture_frames(input_data=gif_data) self.assertEqual(self.frames.shape, frames.shape)