Skip to content
This repository has been archived by the owner on Feb 22, 2020. It is now read-only.

Commit

Permalink
feat(service): add ServiceManager and enable parallel services in one…
Browse files Browse the repository at this point in the history
… container
  • Loading branch information
hanhxiao committed Aug 8, 2019
1 parent c2a9a80 commit ccfd474
Show file tree
Hide file tree
Showing 15 changed files with 239 additions and 45 deletions.
16 changes: 12 additions & 4 deletions gnes/cli/parser.py
Expand Up @@ -106,7 +106,7 @@ def set_composer_flask_parser(parser=None):


def set_service_parser(parser=None):
from ..service.base import SocketType, BaseService
from ..service.base import SocketType, BaseService, ParallelType
import random
if not parser:
parser = set_base_parser()
Expand Down Expand Up @@ -134,8 +134,15 @@ def set_service_parser(parser=None):
parser.add_argument('--read_only', action='store_true', default=False,
help='do not allow the service to modify the model, '
'dump_interval will be ignored')
parser.add_argument('--concurrency_backend', type=str, choices=['thread', 'process'], default='thread',
help='concurrency backend of the service')
parser.add_argument('--parallel_backend', type=str, choices=['thread', 'process'], default='thread',
help='parallel backend of the service')
parser.add_argument('--num_parallel', type=int, default=1,
help='number of parallel services running at the same time, '
'`port_in` and `port_out` will be set to random, '
'and routers will be added automatically when necessary')
parser.add_argument('--parallel_type', type=ParallelType.from_string, choices=list(ParallelType),
default=ParallelType.PUSH_NONBLOCK,
help='parallel type of the concurrent services')
return parser


Expand Down Expand Up @@ -172,7 +179,6 @@ def set_preprocessor_service_parser(parser=None):
if not parser:
parser = set_base_parser()
set_loadable_service_parser(parser)

parser.set_defaults(read_only=True)
return parser

Expand All @@ -181,6 +187,8 @@ def set_router_service_parser(parser=None):
if not parser:
parser = set_base_parser()
set_loadable_service_parser(parser)
parser.add_argument('--num_part', type=int, default=1,
help='explicitly set the number of parts of message')
parser.set_defaults(read_only=True)
return parser

Expand Down
9 changes: 5 additions & 4 deletions gnes/encoder/audio/mfcc.py
Expand Up @@ -15,10 +15,12 @@

# pylint: disable=low-comment-ratio

from typing import List

import numpy as np

from ..base import BaseAudioEncoder
from ...helper import batching
import numpy as np
from typing import List


class MfccEncoder(BaseAudioEncoder):
Expand All @@ -38,9 +40,8 @@ def encode(self, data: List['np.array'], *args, **kwargs) -> np.ndarray:

max_lenth = max([len(mf) for mf in mfccs])


mfccs = [np.concatenate((mf, np.zeros((max_lenth - mf.shape[0], self.n_mfcc), dtype=np.float32)), axis=0)
if mf.shape[0] < max_lenth else mf for mf in mfccs]
mfccs = [mfcc.reshape((1, -1)) for mfcc in mfccs]
mfccs = np.squeeze(np.array(mfccs), axis=1)
return mfccs
return mfccs
4 changes: 3 additions & 1 deletion gnes/encoder/numeric/vlad.py
Expand Up @@ -16,8 +16,10 @@
# pylint: disable=low-comment-ratio


import numpy as np
import copy

import numpy as np

from ..base import BaseNumericEncoder
from ...helper import batching, train_required

Expand Down
22 changes: 11 additions & 11 deletions gnes/encoder/video/mixture_core/model.py
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.

import math

import tensorflow as tf
import tensorflow.contrib.slim as slim

Expand Down Expand Up @@ -52,11 +53,11 @@ def __init__(self, feature_size,

@staticmethod
def rand_init(feature_size):
return tf.random_normal_initializer(stddev=1/math.sqrt(feature_size))
return tf.random_normal_initializer(stddev=1 / math.sqrt(feature_size))

def build_model(self):
self.feeds = tf.placeholder(tf.float32, [None, None, self.input_size])
#self.inputs = self.feeds
# self.inputs = self.feeds
self.inputs = tf.layers.dense(self.feeds, self.feature_size)
self.weights = tf.placeholder(tf.float32, [None, self.vocab_size])
self.max_frames = tf.shape(self.inputs)[1]
Expand All @@ -83,7 +84,7 @@ def build_fvnet(self):

covar_weights = tf.square(covar_weights)
eps = tf.constant([1e-6])
covar_weights = tf.add(covar_weights,eps)
covar_weights = tf.add(covar_weights, eps)

tf.summary.histogram("cluster_weights", cluster_weights)
activation = tf.matmul(reshaped_input, cluster_weights)
Expand Down Expand Up @@ -111,7 +112,7 @@ def build_fvnet(self):

a = tf.multiply(a_sum, cluster_weights2)

activation = tf.transpose(activation,perm=[0, 2, 1])
activation = tf.transpose(activation, perm=[0, 2, 1])

reshaped_input = tf.reshape(reshaped_input,
[-1, self.max_frames, self.feature_size])
Expand All @@ -131,15 +132,15 @@ def build_fvnet(self):
fv2 = tf.divide(fv2, tf.square(covar_weights))
fv2 = tf.subtract(fv2, a_sum)

fv2 = tf.reshape(fv2, [-1, self.cluster_size*self.feature_size])
fv2 = tf.reshape(fv2, [-1, self.cluster_size * self.feature_size])
fv2 = tf.nn.l2_normalize(fv2, 1)
fv2 = tf.reshape(fv2, [-1, self.cluster_size*self.feature_size])
fv2 = tf.reshape(fv2, [-1, self.cluster_size * self.feature_size])
fv2 = tf.nn.l2_normalize(fv2, 1)

fv1 = tf.subtract(fv1, a)
fv1 = tf.divide(fv1, covar_weights)
fv1 = tf.nn.l2_normalize(fv1, 1)
fv1 = tf.reshape(fv1, [-1, self.cluster_size*self.feature_size])
fv1 = tf.reshape(fv1, [-1, self.cluster_size * self.feature_size])
fv1 = tf.nn.l2_normalize(fv1, 1)

self.repre = tf.concat([fv1, fv2], 1)
Expand Down Expand Up @@ -197,9 +198,9 @@ def build_loss(self):
logits = tf.cast(self.label, tf.float32)
if self.use_weights:
logits = logits * self.weights
self.loss = - tf.log(tf.reduce_sum(logits * self.probabilities, axis=1)+1e-9)
self.loss = - tf.log(tf.reduce_sum(logits * self.probabilities, axis=1) + 1e-9)
self.loss = tf.reduce_mean(self.loss)
self.pred =tf.argmax(self.probabilities, 1)
self.pred = tf.argmax(self.probabilities, 1)
self.avg_diff = tf.cast(tf.equal(tf.argmax(self.label, 1), self.pred), tf.float32)
self.avg_diff = tf.reduce_mean(self.avg_diff)

Expand Down Expand Up @@ -230,7 +231,7 @@ def build_loss(self):
self.probabilities2 = tf.nn.softmax(self.probabilities2)

self.loss += tf.reduce_mean(-tf.log(
tf.reduce_sum(logits2*self.probabilities2, axis=1)+1e-9))
tf.reduce_sum(logits2 * self.probabilities2, axis=1) + 1e-9))
self.pred2 = tf.argmax(self.probabilities2, 1)
self.avg_diff2 = tf.cast(tf.equal(tf.argmax(self.label_2, 1), self.pred2), tf.float32)
self.avg_diff2 = tf.reduce_mean(self.avg_diff2)
Expand All @@ -242,4 +243,3 @@ def build_loss(self):
self.eval_res = {'loss': self.loss, 'avg_diff': self.avg_diff}
if self.use_2nd_label:
self.eval_res['avg_diff2'] = self.avg_diff2

5 changes: 3 additions & 2 deletions gnes/preprocessor/audio/audio_vanilla.py
Expand Up @@ -13,8 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from .base import BaseAudioPreprocessor
import numpy as np

from .base import BaseAudioPreprocessor
from ..helper import get_video_length_from_raw, get_audio
from ...proto import array2blob

Expand Down Expand Up @@ -43,4 +44,4 @@ def apply(self, doc: 'gnes_pb2.Document') -> None:
else:
self.logger.info('bad document: no audio extracted')
else:
self.logger.error('bad document: "raw_bytes" is empty!')
self.logger.error('bad document: "raw_bytes" is empty!')
19 changes: 12 additions & 7 deletions gnes/preprocessor/helper.py
Expand Up @@ -15,14 +15,15 @@

# pylint: disable=low-comment-ratio

import datetime
import io
import os
import subprocess as sp
from datetime import timedelta
from typing import List, Callable
import os

import cv2
import numpy as np
import datetime
from datetime import timedelta
from PIL import Image

from ..helper import set_logger
Expand All @@ -32,12 +33,13 @@

def get_video_length(video_path):
import re
process = sp.Popen(['ffmpeg', '-i', video_path],
process = sp.Popen(['ffmpeg', '-i', video_path],
stdout=sp.PIPE,
stderr=sp.STDOUT)
stdout, _ = process.communicate()
stdout = str(stdout)
matches = re.search(r"Duration:\s{1}(?P<hours>\d+?):(?P<minutes>\d+?):(?P<seconds>\d+\.\d+?),", stdout, re.DOTALL).groupdict()
matches = re.search(r"Duration:\s{1}(?P<hours>\d+?):(?P<minutes>\d+?):(?P<seconds>\d+\.\d+?),", stdout,
re.DOTALL).groupdict()
h = float(matches['hours'])
m = float(matches['minutes'])
s = float(matches['seconds'])
Expand Down Expand Up @@ -120,7 +122,9 @@ def split_mp4_random(video_path, avg_length, max_clip_second=10):
prefix = os.path.basename(video_path).replace('.mp4', '')
for i in range(num_part):
i_len = len(ts_group[i])
cmd = 'ffmpeg' + ''.join(ts_group[i]) + '-filter_complex "{}concat=n={}:v=1:a=1" -strict -2 {}_{}.mp4 -y'.format(''.join(['[{}]'.format(k) for k in range(i_len)]), i_len, prefix, i)
cmd = 'ffmpeg' + ''.join(
ts_group[i]) + '-filter_complex "{}concat=n={}:v=1:a=1" -strict -2 {}_{}.mp4 -y'.format(
''.join(['[{}]'.format(k) for k in range(i_len)]), i_len, prefix, i)
os.system(cmd)


Expand Down Expand Up @@ -174,7 +178,8 @@ def get_video_frames(buffer_data: bytes, image_format: str = 'cv2',
cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
frames.append(image)
except Exception as e:
logger.warning("The decoded cv2 image from keyframe buffer can not be converted to RGB: %s" % str(e))
logger.warning(
"The decoded cv2 image from keyframe buffer can not be converted to RGB: %s" % str(e))
else:
logger.error("The image format [%s] is not supported so far!" % image_format)
raise NotImplementedError
Expand Down
2 changes: 1 addition & 1 deletion gnes/preprocessor/image/segmentation.py
Expand Up @@ -15,10 +15,10 @@

import io
import os
from typing import List

import numpy as np
from PIL import Image
from typing import List

from .base import BaseImagePreprocessor
from ...proto import array2blob
Expand Down
9 changes: 6 additions & 3 deletions gnes/preprocessor/image/sliding_window.py
Expand Up @@ -90,7 +90,8 @@ def _get_all_sliding_window(self, img: 'np.ndarray'):
return [np.array(Image.fromarray(img).resize((self.target_img_size, self.target_img_size))) for img in
expanded_input], center_point_list

def _get_slid_offset_nd(self, all_subareas: List[List[int]], index: List[List[int]], center_point: List[float]) -> List[int]:
def _get_slid_offset_nd(self, all_subareas: List[List[int]], index: List[List[int]], center_point: List[float]) -> \
List[int]:
location_list = self._get_location(all_subareas, center_point)
location = [i for i in range(len(location_list)) if location_list[i] is True][0]
return index[location][:2]
Expand All @@ -104,9 +105,11 @@ def _get_location(all_subareas: List[List[int]], center_point: List[float]) -> L
if center_point[0] in range(int(area[0]), int(area[2])) and center_point[1] in range(int(area[1]),
int(area[3])):
location_list.append(True)
elif center_point[0] in range(int(area[0]), int(area[2])) and y_boundary == area[3] and center_point[1] > y_boundary:
elif center_point[0] in range(int(area[0]), int(area[2])) and y_boundary == area[3] and center_point[
1] > y_boundary:
location_list.append(True)
elif center_point[1] in range(int(area[1]), int(area[3])) and x_boundary == area[2] and center_point[0] > x_boundary:
elif center_point[1] in range(int(area[1]), int(area[3])) and x_boundary == area[2] and center_point[
0] > x_boundary:
location_list.append(True)
else:
location_list.append(False)
Expand Down
6 changes: 3 additions & 3 deletions gnes/preprocessor/video/ffmpeg.py
Expand Up @@ -131,13 +131,13 @@ def apply(self, doc: 'gnes_pb2.Document') -> None:
if self.segment_interval == -1:
sub_videos = [frames]
else:
sub_videos = [frames[_: _+self.segment_interval]
sub_videos = [frames[_: _ + self.segment_interval]
for _ in range(0, len(frames), self.segment_interval)]
# cut by num: should specify how many chunks for each doc
elif self.segment_method == 'cut_by_num':
if self.segment_num >= 2:
_interval = int(len(frames)/self.segment_num)
sub_videos = [frames[_: _+_interval]
_interval = int(len(frames) / self.segment_num)
sub_videos = [frames[_: _ + _interval]
for _ in range(0, len(frames), _interval)]
else:
sub_videos = [frames]
Expand Down
4 changes: 3 additions & 1 deletion gnes/router/base.py
Expand Up @@ -36,4 +36,6 @@ def apply(self, msg: 'gnes_pb2.Message', accum_msgs: List['gnes_pb2.Message'], *
if len(msg.envelope.num_part) > 1:
msg.envelope.num_part.pop()
else:
self.logger.error('can not reduce the message further, as num_part="%s"' % msg.envelope.num_part)
self.logger.warning(
'message envelope says num_part=%s, means no further message reducing. '
'ignore this if you explicitly set "num_part" in RouterService' % msg.envelope.num_part)

0 comments on commit ccfd474

Please sign in to comment.