From 780660bd9a8c93105ea54c32eb7a2d7ae949f5dd Mon Sep 17 00:00:00 2001 From: wangbin Date: Fri, 15 Oct 2021 15:15:38 +0800 Subject: [PATCH] add createStream method with replicationFactor parameter --- client/src/main/java/io/hstream/HStreamClient.java | 13 ++++++++++++- .../java/io/hstream/impl/HStreamClientImpl.java | 8 +++++++- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/client/src/main/java/io/hstream/HStreamClient.java b/client/src/main/java/io/hstream/HStreamClient.java index a4fbb1be..60e0e375 100644 --- a/client/src/main/java/io/hstream/HStreamClient.java +++ b/client/src/main/java/io/hstream/HStreamClient.java @@ -20,9 +20,20 @@ static HStreamClientBuilder builder() { /** @return the {@link QueryerBuilder}. */ QueryerBuilder newQueryer(); - /** @param stream the name of stream. */ + /** + * create a new stream with 3 replicas. + * + * @param stream the name of stream. + */ void createStream(String stream); + /** + * create a new stream. + * + * @param stream the name of stream. + */ + void createStream(String stream, short replicationFactor); + /** * Delete specified stream with streamName. * diff --git a/client/src/main/java/io/hstream/impl/HStreamClientImpl.java b/client/src/main/java/io/hstream/impl/HStreamClientImpl.java index b85ba08c..951f67c7 100644 --- a/client/src/main/java/io/hstream/impl/HStreamClientImpl.java +++ b/client/src/main/java/io/hstream/impl/HStreamClientImpl.java @@ -23,6 +23,8 @@ public class HStreamClientImpl implements HStreamClient { private final HStreamApiGrpc.HStreamApiStub stub; private final HStreamApiGrpc.HStreamApiBlockingStub blockingStub; + private static final short DEFAULT_STREAM_REPLICATOR = 3; + public HStreamClientImpl(String serviceUrl) { ManagedChannel channel = ManagedChannelBuilder.forTarget(serviceUrl).usePlaintext().build(); this.managedChannel = channel; @@ -47,8 +49,12 @@ public QueryerBuilder newQueryer() { @Override public void createStream(String streamName) { - Stream stream = new Stream(streamName, 3); + createStream(streamName, DEFAULT_STREAM_REPLICATOR); + } + @Override + public void createStream(String streamName, short replicationFactor) { + Stream stream = new Stream(streamName, replicationFactor); blockingStub.createStream(GrpcUtils.streamToGrpc(stream)); }