diff --git a/pom.xml b/pom.xml
index af3cefa88a6..55afe3fbf12 100644
--- a/pom.xml
+++ b/pom.xml
@@ -168,6 +168,7 @@
2.7
4.3.0
1.3.7
+ 1.6.0
1.6
0.8.0
1.8
diff --git a/stream/clients/python/bookkeeper/admin/client.py b/stream/clients/python/bookkeeper/admin/client.py
index fdb82d2699a..80de76d7b67 100644
--- a/stream/clients/python/bookkeeper/admin/client.py
+++ b/stream/clients/python/bookkeeper/admin/client.py
@@ -16,7 +16,6 @@
import grpc
import logging
-import pkg_resources
from bookkeeper import types
from bookkeeper.admin.namespace import Namespace
@@ -24,7 +23,6 @@
from bookkeeper.common.service_uri import ServiceURI
from bookkeeper.proto.storage_pb2_grpc import RootRangeServiceStub
-__version__ = pkg_resources.get_distribution('bookkeeper').version
__logger__ = logging.getLogger("bookkeeper.admin.Client")
diff --git a/stream/clients/python/bookkeeper/admin/namespace.py b/stream/clients/python/bookkeeper/admin/namespace.py
index 5c3910b1ce6..631673a5e9b 100644
--- a/stream/clients/python/bookkeeper/admin/namespace.py
+++ b/stream/clients/python/bookkeeper/admin/namespace.py
@@ -14,9 +14,12 @@
from __future__ import absolute_import
-from bookkeeper.common.constants import __DEFAULT_STREAM_CONF__
+from bookkeeper.common.constants import __DEFAULT_TABLE_CONF__
from bookkeeper.common.constants import __ROOT_RANGE_METADATA__
from bookkeeper.common.exceptions import from_root_range_rpc_response
+from bookkeeper.common.exceptions import InternalServerError
+from bookkeeper.common.exceptions import StreamExistsError
+from bookkeeper.common.exceptions import StreamNotFoundError
from bookkeeper.common.method import wrap_method
from bookkeeper.common.retry import Retry
from bookkeeper.common.timeout import ExponentialTimeout
@@ -38,7 +41,7 @@ def __init__(self, root_range_service, namespace):
self.__delete_with_retries__ =\
wrap_method(self.__delete_stream__, self.__default_retry__)
- def create(self, stream_name, stream_config=__DEFAULT_STREAM_CONF__):
+ def create(self, stream_name, stream_config=__DEFAULT_TABLE_CONF__):
return self.__create_with_retries__(stream_name, stream_config)
def __create_stream__(self, stream_name, stream_config):
@@ -51,8 +54,18 @@ def __create_stream__(self, stream_name, stream_config):
request=create_stream_req,
metadata=__ROOT_RANGE_METADATA__
)
- create_stream_resp = from_root_range_rpc_response(create_stream_resp)
- return create_stream_resp.stream_props
+ try:
+ create_stream_resp = from_root_range_rpc_response(create_stream_resp)
+ return create_stream_resp.stream_props
+ except InternalServerError as ise:
+ # currently if a stream exists, it also throws
+ # internal server error
+ try:
+ self.get(stream_name=stream_name)
+ raise StreamExistsError("stream '%s' already exists at namespace '%s'"
+ % (stream_name, self.__namespace__))
+ except StreamNotFoundError:
+ raise ise
def get(self, stream_name):
return self.__get_with_retries__(stream_name)
diff --git a/stream/clients/python/bookkeeper/admin/namespaces.py b/stream/clients/python/bookkeeper/admin/namespaces.py
index 3df8cbd57d3..d0e66352ffa 100644
--- a/stream/clients/python/bookkeeper/admin/namespaces.py
+++ b/stream/clients/python/bookkeeper/admin/namespaces.py
@@ -17,6 +17,9 @@
from bookkeeper.common.constants import __DEFAULT_NS_CONF__
from bookkeeper.common.constants import __ROOT_RANGE_METADATA__
from bookkeeper.common.exceptions import from_root_range_rpc_response
+from bookkeeper.common.exceptions import InternalServerError
+from bookkeeper.common.exceptions import NamespaceExistsError
+from bookkeeper.common.exceptions import NamespaceNotFoundError
from bookkeeper.common.method import wrap_method
from bookkeeper.common.retry import Retry
from bookkeeper.common.timeout import ExponentialTimeout
@@ -48,8 +51,17 @@ def __create_ns__(self, namespace, namespace_config):
request=create_ns_req,
metadata=__ROOT_RANGE_METADATA__
)
- create_ns_resp = from_root_range_rpc_response(create_ns_resp)
- return create_ns_resp.ns_props
+ try:
+ create_ns_resp = from_root_range_rpc_response(create_ns_resp)
+ return create_ns_resp.ns_props
+ except InternalServerError as ise:
+ # currently if a namespace exists, it also throws
+ # internal server error.
+ try:
+ self.get(namespace=namespace)
+ raise NamespaceExistsError("namespace '%s' already exists" % namespace)
+ except NamespaceNotFoundError:
+ raise ise
def get(self, namespace):
return self.__get_with_retries__(namespace)
diff --git a/stream/clients/python/bookkeeper/common/constants.py b/stream/clients/python/bookkeeper/common/constants.py
index b21f4bcceb0..8c72791ff0d 100644
--- a/stream/clients/python/bookkeeper/common/constants.py
+++ b/stream/clients/python/bookkeeper/common/constants.py
@@ -20,6 +20,28 @@
('bk-rt-sc-id-bin', util.to_bytes(__ROOT_RANGE_ID__, 8, "big"))
]
__DEFAULT_STREAM_CONF__ = stream_pb2.StreamConfiguration(
+ key_type=stream_pb2.RangeKeyType.values()[0],
+ min_num_ranges=24,
+ initial_num_ranges=4,
+ split_policy=stream_pb2.SplitPolicy(
+ type=stream_pb2.SplitPolicyType.values()[0],
+ fixed_range_policy=stream_pb2.FixedRangeSplitPolicy(
+ num_ranges=2
+ )
+ ),
+ rolling_policy=stream_pb2.SegmentRollingPolicy(
+ size_policy=stream_pb2.SizeBasedSegmentRollingPolicy(
+ max_segment_size=128*1024*1024
+ )
+ ),
+ retention_policy=stream_pb2.RetentionPolicy(
+ time_policy=stream_pb2.TimeBasedRetentionPolicy(
+ retention_minutes=-1
+ )
+ ),
+ storage_type=stream_pb2.StorageType.values()[0]
+)
+__DEFAULT_TABLE_CONF__ = stream_pb2.StreamConfiguration(
key_type=stream_pb2.RangeKeyType.values()[0],
min_num_ranges=24,
initial_num_ranges=4,
diff --git a/stream/clients/python/bookkeeper/common/exceptions.py b/stream/clients/python/bookkeeper/common/exceptions.py
index 782a579d5b7..531b79a88d9 100644
--- a/stream/clients/python/bookkeeper/common/exceptions.py
+++ b/stream/clients/python/bookkeeper/common/exceptions.py
@@ -98,7 +98,8 @@ def __init__(self, message, errors=(), response=None):
self._response = response
def __str__(self):
- return '{} {}'.format(self.code, self.message)
+ return 'grpc_status_code = {}, bk_status_code = {} : {}'\
+ .format(self.grpc_status_code, self.bk_status_code, self.message)
@property
def errors(self):
diff --git a/stream/clients/python/bookkeeper/kv/client.py b/stream/clients/python/bookkeeper/kv/client.py
index 3de12919905..e942fa634fa 100644
--- a/stream/clients/python/bookkeeper/kv/client.py
+++ b/stream/clients/python/bookkeeper/kv/client.py
@@ -16,7 +16,6 @@
import grpc
import logging
-import pkg_resources
from bookkeeper import types
from bookkeeper.admin.namespace import Namespace
@@ -24,7 +23,6 @@
from bookkeeper.kv.table import Table
from bookkeeper.proto.storage_pb2_grpc import RootRangeServiceStub
-__version__ = pkg_resources.get_distribution('bookkeeper').version
__logger__ = logging.getLogger("bookkeeper.kv.Client")
diff --git a/stream/clients/python/bookkeeper/kv/table.py b/stream/clients/python/bookkeeper/kv/table.py
index 605ab5d525a..c5be5cad5ef 100644
--- a/stream/clients/python/bookkeeper/kv/table.py
+++ b/stream/clients/python/bookkeeper/kv/table.py
@@ -40,6 +40,8 @@ def __init__(self, channel, stream_props):
wrap_method(self.__do_get__, self.__default_retry__)
self.__del_with_retries__ =\
wrap_method(self.__do_del__, self.__default_retry__)
+ self.__incr_with_retries__ =\
+ wrap_method(self.__do_incr__, self.__default_retry__)
__logger__.info("initialized table instance with properties : %s",
stream_props)
@@ -79,6 +81,27 @@ def __do_put__(self, key, value, routing_header, grpc_metadata):
)
from_table_rpc_response(put_resp)
+ def incr_str(self, key_str, amount):
+ key = key_str.encode('utf-8')
+ return self.incr(key, amount)
+
+ def incr(self, key, amount):
+ metadata = self.__make_routing_metadata__(key)
+ header = self.__make_routing_header__(key)
+ return self.__incr_with_retries__(key, amount, header, metadata)
+
+ def __do_incr__(self, key, amount, routing_header, grpc_metadata):
+ incr_req = kv_rpc_pb2.IncrementRequest(
+ key=key,
+ amount=amount,
+ header=routing_header
+ )
+ incr_resp = self.__table_service__.Increment(
+ request=incr_req,
+ metadata=grpc_metadata
+ )
+ from_table_rpc_response(incr_resp)
+
def get_str(self, key_str):
key = key_str.encode('utf-8')
return self.get(key)
diff --git a/stream/clients/python/nox.py b/stream/clients/python/nox.py
index 5966201c4a0..6bcb2d9f77d 100644
--- a/stream/clients/python/nox.py
+++ b/stream/clients/python/nox.py
@@ -45,6 +45,33 @@ def default(session):
*session.posargs
)
+@nox.session
+def integration(session):
+ """Default integration test session.
+ This is intended to be run **without** an interpreter set, so
+ that the current ``python`` (on the ``PATH``) or the version of
+ Python corresponding to the ``nox`` binary the ``PATH`` can
+ run the tests.
+ """
+ # Install all test dependencies, then install local packages in-place.
+ session.install('pytest', 'pytest-cov')
+ for local_dep in LOCAL_DEPS:
+ session.install('-e', local_dep)
+ session.install('-e', '.')
+
+ # Run py.test against the unit tests.
+ session.run(
+ 'py.test',
+ '--quiet',
+ '--cov-append',
+ '--cov-report=',
+ '--cov=bookkeeper',
+ '--cov-config=.coveragerc',
+ os.path.join('tests', 'integration'),
+ *session.posargs
+ )
+
+
@nox.session
def lint(session):
diff --git a/stream/clients/python/noxfile.py b/stream/clients/python/noxfile.py
index 3027c9212ca..612afc1742d 100644
--- a/stream/clients/python/noxfile.py
+++ b/stream/clients/python/noxfile.py
@@ -52,6 +52,33 @@ def unit(session):
default(session)
+@nox.session(python=[os.environ['PY_VERSION']])
+def integration(session):
+ """Default integration test session.
+ This is intended to be run **without** an interpreter set, so
+ that the current ``python`` (on the ``PATH``) or the version of
+ Python corresponding to the ``nox`` binary the ``PATH`` can
+ run the tests.
+ """
+ # Install all test dependencies, then install local packages in-place.
+ session.install('pytest', 'pytest-cov')
+ for local_dep in LOCAL_DEPS:
+ session.install('-e', local_dep)
+ session.install('-e', '.')
+
+ # Run py.test against the unit tests.
+ session.run(
+ 'py.test',
+ '--quiet',
+ '--cov-append',
+ '--cov-report=',
+ '--cov=bookkeeper',
+ '--cov-config=.coveragerc',
+ os.path.join('tests', 'integration'),
+ *session.posargs
+ )
+
+
@nox.session
def lint(session):
"""Run linters.
diff --git a/stream/clients/python/scripts/docker_build.sh b/stream/clients/python/scripts/docker_build.sh
new file mode 100755
index 00000000000..f3bd918f805
--- /dev/null
+++ b/stream/clients/python/scripts/docker_build.sh
@@ -0,0 +1,34 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+set -e -x -u
+
+SCRIPT_DIR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
+PY_VERSION=${PY_VERSION:-"3.7"}
+
+COMMANDS=`cat < start() {
channel = InProcessChannelBuilder.forName(containerName)
.usePlaintext()
.directExecutor()
+ // attach routing header interceptor
+ .intercept(new RoutingHeaderProxyInterceptor())
.build();
return FutureUtils.value(StorageContainerImpl.this);
} catch (IOException e) {
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImpl.java
index e3b2d3991df..5fe1612f657 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImpl.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImpl.java
@@ -41,6 +41,7 @@
import org.apache.bookkeeper.stream.proto.kv.rpc.PutResponse;
import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
import org.apache.bookkeeper.stream.proto.kv.rpc.RangeResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.ResponseHeader;
import org.apache.bookkeeper.stream.proto.kv.rpc.RoutingHeader;
import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
import org.apache.bookkeeper.stream.proto.kv.rpc.TxnResponse;
@@ -58,6 +59,7 @@
import org.apache.bookkeeper.stream.proto.storage.GetNamespaceResponse;
import org.apache.bookkeeper.stream.proto.storage.GetStreamRequest;
import org.apache.bookkeeper.stream.proto.storage.GetStreamResponse;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
import org.apache.bookkeeper.stream.protocol.RangeId;
import org.apache.bookkeeper.stream.protocol.util.StorageContainerPlacementPolicy;
import org.apache.bookkeeper.stream.storage.api.kv.TableStore;
@@ -242,6 +244,15 @@ public CompletableFuture getActiveRanges(GetActiveRange
public CompletableFuture range(RangeRequest request) {
RoutingHeader header = request.getHeader();
+ if (header.getRangeId() <= 0L) {
+ return CompletableFuture.completedFuture(RangeResponse.newBuilder()
+ .setHeader(ResponseHeader.newBuilder()
+ .setCode(StatusCode.BAD_REQUEST)
+ .setRoutingHeader(request.getHeader())
+ .build())
+ .build());
+ }
+
RangeId rid = RangeId.of(header.getStreamId(), header.getRangeId());
TableStore store = tableStoreCache.getTableStore(rid);
if (null != store) {
@@ -256,6 +267,15 @@ public CompletableFuture range(RangeRequest request) {
public CompletableFuture put(PutRequest request) {
RoutingHeader header = request.getHeader();
+ if (header.getRangeId() <= 0L) {
+ return CompletableFuture.completedFuture(PutResponse.newBuilder()
+ .setHeader(ResponseHeader.newBuilder()
+ .setCode(StatusCode.BAD_REQUEST)
+ .setRoutingHeader(request.getHeader())
+ .build())
+ .build());
+ }
+
RangeId rid = RangeId.of(header.getStreamId(), header.getRangeId());
TableStore store = tableStoreCache.getTableStore(rid);
if (null != store) {
@@ -270,6 +290,15 @@ public CompletableFuture put(PutRequest request) {
public CompletableFuture delete(DeleteRangeRequest request) {
RoutingHeader header = request.getHeader();
+ if (header.getRangeId() <= 0L) {
+ return CompletableFuture.completedFuture(DeleteRangeResponse.newBuilder()
+ .setHeader(ResponseHeader.newBuilder()
+ .setCode(StatusCode.BAD_REQUEST)
+ .setRoutingHeader(request.getHeader())
+ .build())
+ .build());
+ }
+
RangeId rid = RangeId.of(header.getStreamId(), header.getRangeId());
TableStore store = tableStoreCache.getTableStore(rid);
if (null != store) {
@@ -284,6 +313,15 @@ public CompletableFuture delete(DeleteRangeRequest request)
public CompletableFuture txn(TxnRequest request) {
RoutingHeader header = request.getHeader();
+ if (header.getRangeId() <= 0L) {
+ return CompletableFuture.completedFuture(TxnResponse.newBuilder()
+ .setHeader(ResponseHeader.newBuilder()
+ .setCode(StatusCode.BAD_REQUEST)
+ .setRoutingHeader(request.getHeader())
+ .build())
+ .build());
+ }
+
RangeId rid = RangeId.of(header.getStreamId(), header.getRangeId());
TableStore store = tableStoreCache.getTableStore(rid);
if (null != store) {
@@ -298,6 +336,15 @@ public CompletableFuture txn(TxnRequest request) {
public CompletableFuture incr(IncrementRequest request) {
RoutingHeader header = request.getHeader();
+ if (header.getRangeId() <= 0L) {
+ return CompletableFuture.completedFuture(IncrementResponse.newBuilder()
+ .setHeader(ResponseHeader.newBuilder()
+ .setCode(StatusCode.BAD_REQUEST)
+ .setRoutingHeader(request.getHeader())
+ .build())
+ .build());
+ }
+
RangeId rid = RangeId.of(header.getStreamId(), header.getRangeId());
TableStore store = tableStoreCache.getTableStore(rid);
if (null != store) {
diff --git a/tests/docker-images/current-version-image/Dockerfile b/tests/docker-images/current-version-image/Dockerfile
index c174b6a318f..7d998a46def 100644
--- a/tests/docker-images/current-version-image/Dockerfile
+++ b/tests/docker-images/current-version-image/Dockerfile
@@ -35,7 +35,8 @@ ENV JAVA_HOME=/usr/lib/jvm/jre-1.8.0
# prepare utils
RUN set -x \
&& adduser "${BK_USER}" \
- && yum install -y java-1.8.0-openjdk-headless wget bash python sudo netcat \
+ && yum install -y epel-release \
+ && yum install -y java-1.8.0-openjdk-headless wget bash python-pip python-devel sudo netcat gcc gcc-c++ \
&& mkdir -pv /opt \
&& cd /opt \
&& yum clean all
@@ -47,8 +48,13 @@ RUN mv /opt/${PKG_NAME} /opt/bookkeeper
WORKDIR /opt/bookkeeper
COPY target/scripts /opt/bookkeeper/scripts
+COPY scripts/install-python-client.sh /opt/bookkeeper/scripts
RUN chmod +x -R /opt/bookkeeper/scripts/
+# copy the python client
+ADD target/bookkeeper-client/ /opt/bookkeeper/bookkeeper-client
+RUN /opt/bookkeeper/scripts/install-python-client.sh
+
ENTRYPOINT [ "/bin/bash", "/opt/bookkeeper/scripts/entrypoint.sh" ]
CMD ["bookie"]
diff --git a/tests/docker-images/current-version-image/pom.xml b/tests/docker-images/current-version-image/pom.xml
index 0d2c95da5ce..edffc648143 100644
--- a/tests/docker-images/current-version-image/pom.xml
+++ b/tests/docker-images/current-version-image/pom.xml
@@ -46,6 +46,51 @@
+
+
+ org.codehaus.mojo
+ exec-maven-plugin
+ ${exec-maven-plugin.version}
+
+
+ build-python-client
+ generate-resources
+
+ exec
+
+
+ ${project.basedir}/target
+ ${project.basedir}/../../../stream/clients/python/scripts/docker_build.sh
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-antrun-plugin
+ ${maven-antrun-plugin.version}
+
+
+ generate-resources
+
+ run
+
+
+
+ copy python wheel file
+
+
+ copying docker scripts
+
+
+
+
+
+
+
com.spotify
dockerfile-maven-plugin
@@ -97,27 +142,6 @@
-
-
- org.apache.maven.plugins
- maven-antrun-plugin
- ${maven-antrun-plugin.version}
-
-
- generate-resources
-
- run
-
-
-
- copying docker scripts
-
-
-
-
-
-
-
diff --git a/tests/docker-images/current-version-image/scripts/install-python-client.sh b/tests/docker-images/current-version-image/scripts/install-python-client.sh
new file mode 100644
index 00000000000..2719b335ac1
--- /dev/null
+++ b/tests/docker-images/current-version-image/scripts/install-python-client.sh
@@ -0,0 +1,24 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+set -x
+
+WHEEL_FILE=`ls /opt/bookkeeper/bookkeeper-client/*.whl`
+pip install ${WHEEL_FILE}