From 3989a20fc78784ed336f457edd872fad71c754f1 Mon Sep 17 00:00:00 2001 From: Sijie Guo Date: Tue, 9 Oct 2018 12:24:03 -0700 Subject: [PATCH 1/4] remove codes that are incompatible between python 2 and 3 --- stream/clients/python/bookkeeper/admin/client.py | 2 -- stream/clients/python/bookkeeper/kv/client.py | 2 -- stream/clients/python/setup.py | 2 +- 3 files changed, 1 insertion(+), 5 deletions(-) diff --git a/stream/clients/python/bookkeeper/admin/client.py b/stream/clients/python/bookkeeper/admin/client.py index fdb82d2699a..80de76d7b67 100644 --- a/stream/clients/python/bookkeeper/admin/client.py +++ b/stream/clients/python/bookkeeper/admin/client.py @@ -16,7 +16,6 @@ import grpc import logging -import pkg_resources from bookkeeper import types from bookkeeper.admin.namespace import Namespace @@ -24,7 +23,6 @@ from bookkeeper.common.service_uri import ServiceURI from bookkeeper.proto.storage_pb2_grpc import RootRangeServiceStub -__version__ = pkg_resources.get_distribution('bookkeeper').version __logger__ = logging.getLogger("bookkeeper.admin.Client") diff --git a/stream/clients/python/bookkeeper/kv/client.py b/stream/clients/python/bookkeeper/kv/client.py index 3de12919905..e942fa634fa 100644 --- a/stream/clients/python/bookkeeper/kv/client.py +++ b/stream/clients/python/bookkeeper/kv/client.py @@ -16,7 +16,6 @@ import grpc import logging -import pkg_resources from bookkeeper import types from bookkeeper.admin.namespace import Namespace @@ -24,7 +23,6 @@ from bookkeeper.kv.table import Table from bookkeeper.proto.storage_pb2_grpc import RootRangeServiceStub -__version__ = pkg_resources.get_distribution('bookkeeper').version __logger__ = logging.getLogger("bookkeeper.kv.Client") diff --git a/stream/clients/python/setup.py b/stream/clients/python/setup.py index 74a8b0ca07f..1dfbbe47c3f 100644 --- a/stream/clients/python/setup.py +++ b/stream/clients/python/setup.py @@ -19,7 +19,7 @@ name = 'apache-bookkeeper-client' description = 'Apache BookKeeper client library' -version = '4.9.0-alpha-0' +version = '4.9.0-alpha-2' # Should be one of: # 'Development Status :: 3 - Alpha' # 'Development Status :: 4 - Beta' From 05da908c4cd2fb27e0b7d6540b89883234ef594f Mon Sep 17 00:00:00 2001 From: Sijie Guo Date: Thu, 11 Oct 2018 15:50:57 -0700 Subject: [PATCH 2/4] Install python client in test image --- pom.xml | 1 + .../python/bookkeeper/common/exceptions.py | 3 +- stream/clients/python/bookkeeper/kv/table.py | 23 +++++++++++ stream/clients/python/scripts/docker_build.sh | 34 ++++++++++++++++ .../current-version-image/Dockerfile | 5 +++ .../current-version-image/pom.xml | 40 +++++++++++++++++++ .../scripts/install-python-client.sh | 24 +++++++++++ 7 files changed, 129 insertions(+), 1 deletion(-) create mode 100755 stream/clients/python/scripts/docker_build.sh create mode 100644 tests/docker-images/current-version-image/scripts/install-python-client.sh diff --git a/pom.xml b/pom.xml index af3cefa88a6..55afe3fbf12 100644 --- a/pom.xml +++ b/pom.xml @@ -168,6 +168,7 @@ 2.7 4.3.0 1.3.7 + 1.6.0 1.6 0.8.0 1.8 diff --git a/stream/clients/python/bookkeeper/common/exceptions.py b/stream/clients/python/bookkeeper/common/exceptions.py index 782a579d5b7..531b79a88d9 100644 --- a/stream/clients/python/bookkeeper/common/exceptions.py +++ b/stream/clients/python/bookkeeper/common/exceptions.py @@ -98,7 +98,8 @@ def __init__(self, message, errors=(), response=None): self._response = response def __str__(self): - return '{} {}'.format(self.code, self.message) + return 'grpc_status_code = {}, bk_status_code = {} : {}'\ + .format(self.grpc_status_code, self.bk_status_code, self.message) @property def errors(self): diff --git a/stream/clients/python/bookkeeper/kv/table.py b/stream/clients/python/bookkeeper/kv/table.py index 605ab5d525a..c5be5cad5ef 100644 --- a/stream/clients/python/bookkeeper/kv/table.py +++ b/stream/clients/python/bookkeeper/kv/table.py @@ -40,6 +40,8 @@ def __init__(self, channel, stream_props): wrap_method(self.__do_get__, self.__default_retry__) self.__del_with_retries__ =\ wrap_method(self.__do_del__, self.__default_retry__) + self.__incr_with_retries__ =\ + wrap_method(self.__do_incr__, self.__default_retry__) __logger__.info("initialized table instance with properties : %s", stream_props) @@ -79,6 +81,27 @@ def __do_put__(self, key, value, routing_header, grpc_metadata): ) from_table_rpc_response(put_resp) + def incr_str(self, key_str, amount): + key = key_str.encode('utf-8') + return self.incr(key, amount) + + def incr(self, key, amount): + metadata = self.__make_routing_metadata__(key) + header = self.__make_routing_header__(key) + return self.__incr_with_retries__(key, amount, header, metadata) + + def __do_incr__(self, key, amount, routing_header, grpc_metadata): + incr_req = kv_rpc_pb2.IncrementRequest( + key=key, + amount=amount, + header=routing_header + ) + incr_resp = self.__table_service__.Increment( + request=incr_req, + metadata=grpc_metadata + ) + from_table_rpc_response(incr_resp) + def get_str(self, key_str): key = key_str.encode('utf-8') return self.get(key) diff --git a/stream/clients/python/scripts/docker_build.sh b/stream/clients/python/scripts/docker_build.sh new file mode 100755 index 00000000000..f3bd918f805 --- /dev/null +++ b/stream/clients/python/scripts/docker_build.sh @@ -0,0 +1,34 @@ +#!/bin/bash + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -e -x -u + +SCRIPT_DIR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) +PY_VERSION=${PY_VERSION:-"3.7"} + +COMMANDS=`cat < + + + org.codehaus.mojo + exec-maven-plugin + ${exec-maven-plugin.version} + + + build-python-client + compile + + exec + + + ${project.basedir}/target + ${project.basedir}/../../../stream/clients/python/scripts/docker_build.sh + + + + + + org.apache.maven.plugins + maven-antrun-plugin + ${maven-antrun-plugin.version} + + + compile + + run + + + + copy python wheel file + + + + + + + com.spotify dockerfile-maven-plugin diff --git a/tests/docker-images/current-version-image/scripts/install-python-client.sh b/tests/docker-images/current-version-image/scripts/install-python-client.sh new file mode 100644 index 00000000000..0b0f2c490a0 --- /dev/null +++ b/tests/docker-images/current-version-image/scripts/install-python-client.sh @@ -0,0 +1,24 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +set -x + +WHEEL_FILE=`ls /opt/bookkeeper/bookkeeper-client/*.whl` +pip install /opt/bookkeeper/bookkeeper-client/${WHEEL_FILE} From 8400ba499707077f32a35e16aee905df14127970 Mon Sep 17 00:00:00 2001 From: Sijie Guo Date: Thu, 18 Oct 2018 09:02:32 +0800 Subject: [PATCH 3/4] build the docker image --- .../current-version-image/Dockerfile | 3 +- .../current-version-image/pom.xml | 30 +++++-------------- .../scripts/install-python-client.sh | 2 +- 3 files changed, 10 insertions(+), 25 deletions(-) diff --git a/tests/docker-images/current-version-image/Dockerfile b/tests/docker-images/current-version-image/Dockerfile index 13b92eb6505..7d998a46def 100644 --- a/tests/docker-images/current-version-image/Dockerfile +++ b/tests/docker-images/current-version-image/Dockerfile @@ -35,7 +35,8 @@ ENV JAVA_HOME=/usr/lib/jvm/jre-1.8.0 # prepare utils RUN set -x \ && adduser "${BK_USER}" \ - && yum install -y java-1.8.0-openjdk-headless wget bash python sudo netcat \ + && yum install -y epel-release \ + && yum install -y java-1.8.0-openjdk-headless wget bash python-pip python-devel sudo netcat gcc gcc-c++ \ && mkdir -pv /opt \ && cd /opt \ && yum clean all diff --git a/tests/docker-images/current-version-image/pom.xml b/tests/docker-images/current-version-image/pom.xml index 64992f2549c..edffc648143 100644 --- a/tests/docker-images/current-version-image/pom.xml +++ b/tests/docker-images/current-version-image/pom.xml @@ -54,7 +54,7 @@ build-python-client - compile + generate-resources exec @@ -65,13 +65,14 @@ + org.apache.maven.plugins maven-antrun-plugin ${maven-antrun-plugin.version} - compile + generate-resources run @@ -81,6 +82,10 @@ + copying docker scripts + + @@ -137,27 +142,6 @@ - - - org.apache.maven.plugins - maven-antrun-plugin - ${maven-antrun-plugin.version} - - - generate-resources - - run - - - - copying docker scripts - - - - - - - diff --git a/tests/docker-images/current-version-image/scripts/install-python-client.sh b/tests/docker-images/current-version-image/scripts/install-python-client.sh index 0b0f2c490a0..2719b335ac1 100644 --- a/tests/docker-images/current-version-image/scripts/install-python-client.sh +++ b/tests/docker-images/current-version-image/scripts/install-python-client.sh @@ -21,4 +21,4 @@ set -x WHEEL_FILE=`ls /opt/bookkeeper/bookkeeper-client/*.whl` -pip install /opt/bookkeeper/bookkeeper-client/${WHEEL_FILE} +pip install ${WHEEL_FILE} From fc178a319d1719a0ca487ba8eac4d08bd0267007 Mon Sep 17 00:00:00 2001 From: Sijie Guo Date: Mon, 29 Oct 2018 18:45:45 -0700 Subject: [PATCH 4/4] [table service] Fix python client can't read keys written by java clients *Motivation* Python client is a thin table service client. So it relies on storage container intercepting the requests to attach routing header. It attaches the routing header if the requests are needed to route to remote containers; however for the local containers it doesn't attach the right routing header. *Changes* Fix `StorageContainerImpl` to use proxy routing interceptor to intercept requests to attach routing header. *Tests* Add integration tests in python client & add test for python clients reading key/value pairs written by java clients. --- .../python/bookkeeper/admin/namespace.py | 21 +++- .../python/bookkeeper/admin/namespaces.py | 16 ++- .../python/bookkeeper/common/constants.py | 22 ++++ stream/clients/python/nox.py | 27 +++++ stream/clients/python/noxfile.py | 27 +++++ .../scripts/docker_integration_tests.sh | 29 +++++ .../python/scripts/run_integration_tests.sh | 41 +++++++ .../tests/integration/bookkeeper/__init__.py | 11 ++ .../bookkeeper/admin/test_admin_client.py | 81 +++++++++++++ .../integration/bookkeeper/kv/test_client.py | 111 ++++++++++++++++++ .../storage/impl/sc/StorageContainerImpl.java | 3 + .../impl/service/RangeStoreServiceImpl.java | 47 ++++++++ 12 files changed, 430 insertions(+), 6 deletions(-) create mode 100755 stream/clients/python/scripts/docker_integration_tests.sh create mode 100755 stream/clients/python/scripts/run_integration_tests.sh create mode 100644 stream/clients/python/tests/integration/bookkeeper/__init__.py create mode 100644 stream/clients/python/tests/integration/bookkeeper/admin/test_admin_client.py create mode 100644 stream/clients/python/tests/integration/bookkeeper/kv/test_client.py diff --git a/stream/clients/python/bookkeeper/admin/namespace.py b/stream/clients/python/bookkeeper/admin/namespace.py index 5c3910b1ce6..631673a5e9b 100644 --- a/stream/clients/python/bookkeeper/admin/namespace.py +++ b/stream/clients/python/bookkeeper/admin/namespace.py @@ -14,9 +14,12 @@ from __future__ import absolute_import -from bookkeeper.common.constants import __DEFAULT_STREAM_CONF__ +from bookkeeper.common.constants import __DEFAULT_TABLE_CONF__ from bookkeeper.common.constants import __ROOT_RANGE_METADATA__ from bookkeeper.common.exceptions import from_root_range_rpc_response +from bookkeeper.common.exceptions import InternalServerError +from bookkeeper.common.exceptions import StreamExistsError +from bookkeeper.common.exceptions import StreamNotFoundError from bookkeeper.common.method import wrap_method from bookkeeper.common.retry import Retry from bookkeeper.common.timeout import ExponentialTimeout @@ -38,7 +41,7 @@ def __init__(self, root_range_service, namespace): self.__delete_with_retries__ =\ wrap_method(self.__delete_stream__, self.__default_retry__) - def create(self, stream_name, stream_config=__DEFAULT_STREAM_CONF__): + def create(self, stream_name, stream_config=__DEFAULT_TABLE_CONF__): return self.__create_with_retries__(stream_name, stream_config) def __create_stream__(self, stream_name, stream_config): @@ -51,8 +54,18 @@ def __create_stream__(self, stream_name, stream_config): request=create_stream_req, metadata=__ROOT_RANGE_METADATA__ ) - create_stream_resp = from_root_range_rpc_response(create_stream_resp) - return create_stream_resp.stream_props + try: + create_stream_resp = from_root_range_rpc_response(create_stream_resp) + return create_stream_resp.stream_props + except InternalServerError as ise: + # currently if a stream exists, it also throws + # internal server error + try: + self.get(stream_name=stream_name) + raise StreamExistsError("stream '%s' already exists at namespace '%s'" + % (stream_name, self.__namespace__)) + except StreamNotFoundError: + raise ise def get(self, stream_name): return self.__get_with_retries__(stream_name) diff --git a/stream/clients/python/bookkeeper/admin/namespaces.py b/stream/clients/python/bookkeeper/admin/namespaces.py index 3df8cbd57d3..d0e66352ffa 100644 --- a/stream/clients/python/bookkeeper/admin/namespaces.py +++ b/stream/clients/python/bookkeeper/admin/namespaces.py @@ -17,6 +17,9 @@ from bookkeeper.common.constants import __DEFAULT_NS_CONF__ from bookkeeper.common.constants import __ROOT_RANGE_METADATA__ from bookkeeper.common.exceptions import from_root_range_rpc_response +from bookkeeper.common.exceptions import InternalServerError +from bookkeeper.common.exceptions import NamespaceExistsError +from bookkeeper.common.exceptions import NamespaceNotFoundError from bookkeeper.common.method import wrap_method from bookkeeper.common.retry import Retry from bookkeeper.common.timeout import ExponentialTimeout @@ -48,8 +51,17 @@ def __create_ns__(self, namespace, namespace_config): request=create_ns_req, metadata=__ROOT_RANGE_METADATA__ ) - create_ns_resp = from_root_range_rpc_response(create_ns_resp) - return create_ns_resp.ns_props + try: + create_ns_resp = from_root_range_rpc_response(create_ns_resp) + return create_ns_resp.ns_props + except InternalServerError as ise: + # currently if a namespace exists, it also throws + # internal server error. + try: + self.get(namespace=namespace) + raise NamespaceExistsError("namespace '%s' already exists" % namespace) + except NamespaceNotFoundError: + raise ise def get(self, namespace): return self.__get_with_retries__(namespace) diff --git a/stream/clients/python/bookkeeper/common/constants.py b/stream/clients/python/bookkeeper/common/constants.py index b21f4bcceb0..8c72791ff0d 100644 --- a/stream/clients/python/bookkeeper/common/constants.py +++ b/stream/clients/python/bookkeeper/common/constants.py @@ -20,6 +20,28 @@ ('bk-rt-sc-id-bin', util.to_bytes(__ROOT_RANGE_ID__, 8, "big")) ] __DEFAULT_STREAM_CONF__ = stream_pb2.StreamConfiguration( + key_type=stream_pb2.RangeKeyType.values()[0], + min_num_ranges=24, + initial_num_ranges=4, + split_policy=stream_pb2.SplitPolicy( + type=stream_pb2.SplitPolicyType.values()[0], + fixed_range_policy=stream_pb2.FixedRangeSplitPolicy( + num_ranges=2 + ) + ), + rolling_policy=stream_pb2.SegmentRollingPolicy( + size_policy=stream_pb2.SizeBasedSegmentRollingPolicy( + max_segment_size=128*1024*1024 + ) + ), + retention_policy=stream_pb2.RetentionPolicy( + time_policy=stream_pb2.TimeBasedRetentionPolicy( + retention_minutes=-1 + ) + ), + storage_type=stream_pb2.StorageType.values()[0] +) +__DEFAULT_TABLE_CONF__ = stream_pb2.StreamConfiguration( key_type=stream_pb2.RangeKeyType.values()[0], min_num_ranges=24, initial_num_ranges=4, diff --git a/stream/clients/python/nox.py b/stream/clients/python/nox.py index 5966201c4a0..6bcb2d9f77d 100644 --- a/stream/clients/python/nox.py +++ b/stream/clients/python/nox.py @@ -45,6 +45,33 @@ def default(session): *session.posargs ) +@nox.session +def integration(session): + """Default integration test session. + This is intended to be run **without** an interpreter set, so + that the current ``python`` (on the ``PATH``) or the version of + Python corresponding to the ``nox`` binary the ``PATH`` can + run the tests. + """ + # Install all test dependencies, then install local packages in-place. + session.install('pytest', 'pytest-cov') + for local_dep in LOCAL_DEPS: + session.install('-e', local_dep) + session.install('-e', '.') + + # Run py.test against the unit tests. + session.run( + 'py.test', + '--quiet', + '--cov-append', + '--cov-report=', + '--cov=bookkeeper', + '--cov-config=.coveragerc', + os.path.join('tests', 'integration'), + *session.posargs + ) + + @nox.session def lint(session): diff --git a/stream/clients/python/noxfile.py b/stream/clients/python/noxfile.py index 3027c9212ca..612afc1742d 100644 --- a/stream/clients/python/noxfile.py +++ b/stream/clients/python/noxfile.py @@ -52,6 +52,33 @@ def unit(session): default(session) +@nox.session(python=[os.environ['PY_VERSION']]) +def integration(session): + """Default integration test session. + This is intended to be run **without** an interpreter set, so + that the current ``python`` (on the ``PATH``) or the version of + Python corresponding to the ``nox`` binary the ``PATH`` can + run the tests. + """ + # Install all test dependencies, then install local packages in-place. + session.install('pytest', 'pytest-cov') + for local_dep in LOCAL_DEPS: + session.install('-e', local_dep) + session.install('-e', '.') + + # Run py.test against the unit tests. + session.run( + 'py.test', + '--quiet', + '--cov-append', + '--cov-report=', + '--cov=bookkeeper', + '--cov-config=.coveragerc', + os.path.join('tests', 'integration'), + *session.posargs + ) + + @nox.session def lint(session): """Run linters. diff --git a/stream/clients/python/scripts/docker_integration_tests.sh b/stream/clients/python/scripts/docker_integration_tests.sh new file mode 100755 index 00000000000..2480924e8dd --- /dev/null +++ b/stream/clients/python/scripts/docker_integration_tests.sh @@ -0,0 +1,29 @@ +#!/bin/bash + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -e -x -u + +SCRIPT_DIR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) +NOXSESSION=${NOXSESSION:-"integration"} + +docker run \ + -v "${SCRIPT_DIR}/..":/opt/bookkeeper_python_client \ + -w /opt/bookkeeper_python_client \ + -e NOXSESSION="${NOXSESSION}" \ + --entrypoint=/bin/bash \ + apachebookkeeper/bookkeeper-current \ + /opt/bookkeeper_python_client/scripts/run_integration_tests.sh diff --git a/stream/clients/python/scripts/run_integration_tests.sh b/stream/clients/python/scripts/run_integration_tests.sh new file mode 100755 index 00000000000..637b98a77db --- /dev/null +++ b/stream/clients/python/scripts/run_integration_tests.sh @@ -0,0 +1,41 @@ +#!/bin/bash + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -e -x -u + +BK_HOME=/opt/bookkeeper + +echo "starting bookkeeper standalone ..." +${BK_HOME}/bin/standalone process up + +echo "installing nox ..." +find . | grep -E "(__pycache__|\.pyc|\.pyo$)" | xargs rm -rf +pip install nox-automation +echo "installed nox." + +TABLE="test-java-updates" +echo "creating test table ..." +${BK_HOME}/bin/bkctl tables create -r 1 ${TABLE} +for x in {0..20}; do + echo "write kv pair '${x}'" + ${BK_HOME}/bin/bkctl table put ${TABLE} java-key-$x java-value-$x; +done +echo "ingested kv pairs for testing." + +echo "run integration tests" +nox --session integration +echo "done integration tests" diff --git a/stream/clients/python/tests/integration/bookkeeper/__init__.py b/stream/clients/python/tests/integration/bookkeeper/__init__.py new file mode 100644 index 00000000000..4d9a92490ba --- /dev/null +++ b/stream/clients/python/tests/integration/bookkeeper/__init__.py @@ -0,0 +1,11 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/stream/clients/python/tests/integration/bookkeeper/admin/test_admin_client.py b/stream/clients/python/tests/integration/bookkeeper/admin/test_admin_client.py new file mode 100644 index 00000000000..fd403ef611c --- /dev/null +++ b/stream/clients/python/tests/integration/bookkeeper/admin/test_admin_client.py @@ -0,0 +1,81 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from bookkeeper import admin +from bookkeeper.common.exceptions import NamespaceExistsError +from bookkeeper.common.exceptions import NamespaceNotFoundError +from bookkeeper.common.exceptions import StreamExistsError +from bookkeeper.common.exceptions import StreamNotFoundError +from bookkeeper.types import StorageClientSettings +import logging +import pytest +import uuid + +__logger__ = logging.getLogger("bookkeeper.admin.test_admin_client") + + +def test_create_delete_namespaces(): + settings = StorageClientSettings(service_uri="bk://localhost:4181") + client = admin.Client(storage_client_settings=settings) + ns = "test_create_delete_namespaces_%s" % uuid.uuid4().hex + with pytest.raises(NamespaceNotFoundError): + __logger__.info("getting non-existent namespace '%s'", ns) + client.namespaces().get(ns) + __logger__.info("creating namespace '%s'", ns) + ns_props = client.namespaces().create(ns) + __logger__.info("created namespace '%s' : %s", ns, ns_props) + __logger__.info("getting namespace '%s'", ns) + read_ns_props = client.namespaces().get(ns) + __logger__.info("got namespace '%s' : %s", ns, read_ns_props) + assert ns_props == read_ns_props + with pytest.raises(NamespaceExistsError): + __logger__.info("creating existed namespace '%s'", ns) + client.namespaces().create(ns) + __logger__.info("deleting existed namespace '%s'", ns) + client.namespaces().delete(ns) + with pytest.raises(NamespaceNotFoundError): + client.namespaces().get(ns) + with pytest.raises(NamespaceNotFoundError): + client.namespaces().delete(ns) + __logger__.info("end of test_create_delete_namespace") + + +def test_create_delete_tables(): + settings = StorageClientSettings(service_uri="bk://localhost:4181") + client = admin.Client(storage_client_settings=settings) + ns_name = "test_create_delete_tables_%s" % uuid.uuid4().hex + ns_props = client.namespaces().create(ns_name) + __logger__.info("Created namespace '%s' : %s", ns_name, ns_props) + ns = client.namespace(ns_name) + + table_name = "table_%s" % uuid.uuid4().hex + # test create, delete and get tables + with pytest.raises(StreamNotFoundError): + __logger__.info("getting non-existent table '%s'", table_name) + ns.get(table_name) + __logger__.info("creating table '%s'", table_name) + table_props = ns.create(table_name) + __logger__.info("created table '%s' : %s", table_name, table_props) + __logger__.info("getting table '%s'", table_name) + read_tbl_props = ns.get(table_name) + __logger__.info("got table '%s' : %s", table_name, read_tbl_props) + assert table_props == read_tbl_props + with pytest.raises(StreamExistsError): + __logger__.info("creating existed table '%s'", table_name) + ns.create(table_name) + __logger__.info("deleting existed table '%s'", table_name) + ns.delete(table_name) + with pytest.raises(StreamNotFoundError): + ns.get(table_name) + with pytest.raises(StreamNotFoundError): + ns.delete(table_name) + __logger__.info("end of test_create_delete_tables") diff --git a/stream/clients/python/tests/integration/bookkeeper/kv/test_client.py b/stream/clients/python/tests/integration/bookkeeper/kv/test_client.py new file mode 100644 index 00000000000..b43d99e9df6 --- /dev/null +++ b/stream/clients/python/tests/integration/bookkeeper/kv/test_client.py @@ -0,0 +1,111 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from bookkeeper import admin, kv +from bookkeeper.types import StorageClientSettings +from bookkeeper.common.exceptions import BadRequest +from bookkeeper.proto import storage_pb2 +import grpc +import logging +import pytest +import uuid + +__logger__ = logging.getLogger("bookkeeper.kv.test_client") + + +def test_kv_ops(): + settings = StorageClientSettings(service_uri="bk://localhost:4181") + admin_client = admin.Client(storage_client_settings=settings) + ns = "test_kv_ops_%s" % uuid.uuid4().hex + ns_props = admin_client.namespaces().create(ns) + __logger__.info("Created namespace '%s' : %s", ns, ns_props) + tbl_name = "test_kv_ops_table" + tbl_props = admin_client.namespace(ns).create(tbl_name) + __logger__.info("Created table '%s' at namespace '%s' : %s", tbl_name, ns, tbl_props) + + kv_client = kv.Client(storage_client_settings=settings, namespace=ns) + tbl = kv_client.table(tbl_name) + + for x in range(0, 20): + read_val = tbl.get_str("key-%s" % x) + assert read_val is None + + for x in range(0, 20): + tbl.put_str("key-%s" % x, "value-%s" % x) + + for x in range(0, 20): + read_kv = tbl.get_str("key-%s" % x) + expected_key = "key-%s" % x + expected_value = "value-%s" % x + assert read_kv.key == str.encode(expected_key, 'utf-8') + assert read_kv.is_number is False + assert read_kv.value == str.encode(expected_value, 'utf-8') + assert read_kv.version == 0 + + for x in range(0, 20): + try: + tbl.incr_str("key-%s" % x, 20) + assert False + except BadRequest as e: + assert e.grpc_status_code == grpc.StatusCode.FAILED_PRECONDITION + assert e.bk_status_code == storage_pb2.BAD_REQUEST + + for x in range(0, 20): + read_val = tbl.get_str("counter-%s" % x) + assert read_val is None + + for x in range(0, 20): + tbl.incr_str("counter-%s" % x, (x + 1)) + + for x in range(0, 20): + read_kv = tbl.get_str("counter-%s" % x) + expected_key = "counter-%s" % x + expected_num = (x + 1) + assert read_kv.key == str.encode(expected_key, 'utf-8') + assert read_kv.is_number is True + assert read_kv.number_value == expected_num + assert read_kv.version == 0 + + for x in range(0, 20): + try: + tbl.put_str("counter-%s" % x, "value-%s" % x) + assert False + except BadRequest as e: + assert e.grpc_status_code == grpc.StatusCode.FAILED_PRECONDITION + assert e.bk_status_code == storage_pb2.BAD_REQUEST + + for x in range(0, 20): + tbl.delete_str("key-%s" % x) + read_val = tbl.get_str("key-%s" % x) + assert read_val is None + + for x in range(0, 20): + tbl.delete_str("counter-%s" % x) + read_val = tbl.get_str("counter-%s" % x) + assert read_val is None + + +def test_get_kv_from_table_updated_by_java_client(): + settings = StorageClientSettings(service_uri="bk://localhost:4181") + ns = "default" + tbl_name = "test-java-updates" + kv_client = kv.Client(storage_client_settings=settings, namespace=ns) + tbl = kv_client.table(tbl_name) + + for x in range(0, 20): + expected_key = "java-key-%s" % x + read_kv = tbl.get_str(expected_key) + expected_value = "java-value-%s" % x + assert read_kv.key == str.encode(expected_key, 'utf-8') + assert read_kv.is_number is False + assert read_kv.value == str.encode(expected_value, 'utf-8') + assert read_kv.version == 0 diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerImpl.java index 4bdc8015681..b3c67f20fde 100644 --- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerImpl.java +++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerImpl.java @@ -32,6 +32,7 @@ import org.apache.bookkeeper.stream.storage.api.sc.StorageContainer; import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerService; import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerServiceFactory; +import org.apache.bookkeeper.stream.storage.impl.routing.RoutingHeaderProxyInterceptor; /** * The default implementation of {@link StorageContainer}. @@ -82,6 +83,8 @@ public CompletableFuture start() { channel = InProcessChannelBuilder.forName(containerName) .usePlaintext() .directExecutor() + // attach routing header interceptor + .intercept(new RoutingHeaderProxyInterceptor()) .build(); return FutureUtils.value(StorageContainerImpl.this); } catch (IOException e) { diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImpl.java index e3b2d3991df..5fe1612f657 100644 --- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImpl.java +++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImpl.java @@ -41,6 +41,7 @@ import org.apache.bookkeeper.stream.proto.kv.rpc.PutResponse; import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest; import org.apache.bookkeeper.stream.proto.kv.rpc.RangeResponse; +import org.apache.bookkeeper.stream.proto.kv.rpc.ResponseHeader; import org.apache.bookkeeper.stream.proto.kv.rpc.RoutingHeader; import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest; import org.apache.bookkeeper.stream.proto.kv.rpc.TxnResponse; @@ -58,6 +59,7 @@ import org.apache.bookkeeper.stream.proto.storage.GetNamespaceResponse; import org.apache.bookkeeper.stream.proto.storage.GetStreamRequest; import org.apache.bookkeeper.stream.proto.storage.GetStreamResponse; +import org.apache.bookkeeper.stream.proto.storage.StatusCode; import org.apache.bookkeeper.stream.protocol.RangeId; import org.apache.bookkeeper.stream.protocol.util.StorageContainerPlacementPolicy; import org.apache.bookkeeper.stream.storage.api.kv.TableStore; @@ -242,6 +244,15 @@ public CompletableFuture getActiveRanges(GetActiveRange public CompletableFuture range(RangeRequest request) { RoutingHeader header = request.getHeader(); + if (header.getRangeId() <= 0L) { + return CompletableFuture.completedFuture(RangeResponse.newBuilder() + .setHeader(ResponseHeader.newBuilder() + .setCode(StatusCode.BAD_REQUEST) + .setRoutingHeader(request.getHeader()) + .build()) + .build()); + } + RangeId rid = RangeId.of(header.getStreamId(), header.getRangeId()); TableStore store = tableStoreCache.getTableStore(rid); if (null != store) { @@ -256,6 +267,15 @@ public CompletableFuture range(RangeRequest request) { public CompletableFuture put(PutRequest request) { RoutingHeader header = request.getHeader(); + if (header.getRangeId() <= 0L) { + return CompletableFuture.completedFuture(PutResponse.newBuilder() + .setHeader(ResponseHeader.newBuilder() + .setCode(StatusCode.BAD_REQUEST) + .setRoutingHeader(request.getHeader()) + .build()) + .build()); + } + RangeId rid = RangeId.of(header.getStreamId(), header.getRangeId()); TableStore store = tableStoreCache.getTableStore(rid); if (null != store) { @@ -270,6 +290,15 @@ public CompletableFuture put(PutRequest request) { public CompletableFuture delete(DeleteRangeRequest request) { RoutingHeader header = request.getHeader(); + if (header.getRangeId() <= 0L) { + return CompletableFuture.completedFuture(DeleteRangeResponse.newBuilder() + .setHeader(ResponseHeader.newBuilder() + .setCode(StatusCode.BAD_REQUEST) + .setRoutingHeader(request.getHeader()) + .build()) + .build()); + } + RangeId rid = RangeId.of(header.getStreamId(), header.getRangeId()); TableStore store = tableStoreCache.getTableStore(rid); if (null != store) { @@ -284,6 +313,15 @@ public CompletableFuture delete(DeleteRangeRequest request) public CompletableFuture txn(TxnRequest request) { RoutingHeader header = request.getHeader(); + if (header.getRangeId() <= 0L) { + return CompletableFuture.completedFuture(TxnResponse.newBuilder() + .setHeader(ResponseHeader.newBuilder() + .setCode(StatusCode.BAD_REQUEST) + .setRoutingHeader(request.getHeader()) + .build()) + .build()); + } + RangeId rid = RangeId.of(header.getStreamId(), header.getRangeId()); TableStore store = tableStoreCache.getTableStore(rid); if (null != store) { @@ -298,6 +336,15 @@ public CompletableFuture txn(TxnRequest request) { public CompletableFuture incr(IncrementRequest request) { RoutingHeader header = request.getHeader(); + if (header.getRangeId() <= 0L) { + return CompletableFuture.completedFuture(IncrementResponse.newBuilder() + .setHeader(ResponseHeader.newBuilder() + .setCode(StatusCode.BAD_REQUEST) + .setRoutingHeader(request.getHeader()) + .build()) + .build()); + } + RangeId rid = RangeId.of(header.getStreamId(), header.getRangeId()); TableStore store = tableStoreCache.getTableStore(rid); if (null != store) {