From 343ef364727a281bf2fc096f15780744a62ab771 Mon Sep 17 00:00:00 2001 From: wangbin Date: Mon, 11 Oct 2021 14:03:48 +0800 Subject: [PATCH 1/2] refactor Producer and fix send extra empty request --- client/build.gradle | 2 +- client/src/main/java/io/hstream/Producer.java | 37 +--- .../java/io/hstream/impl/ProducerImpl.java | 196 +++++------------- .../java/io/hstream/HStreamClientTest.java | 147 +++---------- 4 files changed, 89 insertions(+), 293 deletions(-) diff --git a/client/build.gradle b/client/build.gradle index 86db8176..5092e111 100644 --- a/client/build.gradle +++ b/client/build.gradle @@ -8,7 +8,7 @@ plugins { } group = 'io.hstream' -version = '0.5.0' +version = '0.5.1-SNAPSHOT' repositories { mavenCentral() diff --git a/client/src/main/java/io/hstream/Producer.java b/client/src/main/java/io/hstream/Producer.java index 40f6d8de..4586a69b 100644 --- a/client/src/main/java/io/hstream/Producer.java +++ b/client/src/main/java/io/hstream/Producer.java @@ -6,39 +6,18 @@ public interface Producer { /** - * Sync method to generate a raw format message. + * Write a raw record. * - * @param rawRecord raw format message. - * @return the {@link RecordId} of generated message. + * @param rawRecord raw format record. + * @return the {@link RecordId} wrapped in a {@link CompletableFuture}. */ - RecordId write(byte[] rawRecord); + CompletableFuture write(byte[] rawRecord); /** - * Sync method to generate a {@link HRecord} format message. + * Write a {@link HRecord}. * - * @param hRecord HRecord format message. - * @return the {@link RecordId} of generated message. + * @param hRecord {@link HRecord}. + * @return the {@link RecordId} wrapped in a {@link CompletableFuture}. */ - RecordId write(HRecord hRecord); - - /** - * Async method to generate a raw format message. - * - * @param rawRecord raw format message. - * @return the {@link RecordId} of generated message which wrapped in a {@link CompletableFuture} - * object. - */ - CompletableFuture writeAsync(byte[] rawRecord); - - /** - * Async method to generate a {@link HRecord} format message. - * - * @param hRecord HRecord format message. - * @return the {@link RecordId} of generated message which wrapped in a {@link CompletableFuture} - * object. - */ - CompletableFuture writeAsync(HRecord hRecord); - - /** Flush buffed message. */ - void flush(); + CompletableFuture write(HRecord hRecord); } diff --git a/client/src/main/java/io/hstream/impl/ProducerImpl.java b/client/src/main/java/io/hstream/impl/ProducerImpl.java index e7a5d5de..369ccd67 100644 --- a/client/src/main/java/io/hstream/impl/ProducerImpl.java +++ b/client/src/main/java/io/hstream/impl/ProducerImpl.java @@ -5,6 +5,7 @@ import io.hstream.internal.AppendRequest; import io.hstream.internal.AppendResponse; import io.hstream.internal.HStreamApiGrpc; +import io.hstream.internal.HStreamRecord; import io.hstream.util.GrpcUtils; import io.hstream.util.RecordUtils; import java.util.ArrayList; @@ -14,8 +15,6 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; -import java.util.stream.IntStream; -import org.apache.commons.lang3.tuple.ImmutablePair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,7 +29,7 @@ public class ProducerImpl implements Producer { private final Semaphore semaphore; private final Lock lock; - private final List recordBuffer; + private final List recordBuffer; private final List> futures; public ProducerImpl( @@ -57,126 +56,62 @@ public ProducerImpl( } @Override - public RecordId write(byte[] rawRecord) { - CompletableFuture> future = writeRawRecordsAsync(List.of(rawRecord)); - logger.info("wait for write future"); - return future.join().get(0); + public CompletableFuture write(byte[] rawRecord) { + HStreamRecord hStreamRecord = RecordUtils.buildHStreamRecordFromRawRecord(rawRecord); + return writeInternal(hStreamRecord); } @Override - public RecordId write(HRecord hRecord) { - CompletableFuture> future = writeHRecordsAsync(List.of(hRecord)); - return future.join().get(0); + public CompletableFuture write(HRecord hRecord) { + HStreamRecord hStreamRecord = RecordUtils.buildHStreamRecordFromHRecord(hRecord); + return writeInternal(hStreamRecord); } - @Override - public CompletableFuture writeAsync(byte[] rawRecord) { + private CompletableFuture writeInternal(HStreamRecord hStreamRecord) { if (!enableBatch) { - return writeRawRecordsAsync(List.of(rawRecord)).thenApply(list -> list.get(0)); + return writeHStreamRecords(List.of(hStreamRecord)).thenApply(recordIds -> recordIds.get(0)); } else { - try { - semaphore.acquire(); - } catch (InterruptedException e) { - throw new HStreamDBClientException(e); - } - - lock.lock(); - try { - CompletableFuture completableFuture = new CompletableFuture<>(); - recordBuffer.add(rawRecord); - futures.add(completableFuture); - - if (recordBuffer.size() == recordCountLimit) { - flush(); - } - return completableFuture; - } finally { - lock.unlock(); - } + return addToBuffer(hStreamRecord); } } - @Override - public CompletableFuture writeAsync(HRecord hRecord) { - if (!enableBatch) { - return writeHRecordsAsync(List.of(hRecord)).thenApply(list -> list.get(0)); - } else { - try { - semaphore.acquire(); - } catch (InterruptedException e) { - throw new HStreamDBClientException(e); - } - - lock.lock(); - try { - CompletableFuture completableFuture = new CompletableFuture<>(); - recordBuffer.add(hRecord); - futures.add(completableFuture); - - if (recordBuffer.size() == recordCountLimit) { - flush(); - } - return completableFuture; - } finally { - lock.unlock(); - } - } - } - - @Override - public void flush() { - flushSync(); - } - - private CompletableFuture> writeRawRecordsAsync(List rawRecords) { - - CompletableFuture> completableFuture = new CompletableFuture<>(); - - AppendRequest appendRequest = - AppendRequest.newBuilder() - .setStreamName(this.stream) - .addAllRecords( - rawRecords.stream() - .map(rawRecord -> RecordUtils.buildHStreamRecordFromRawRecord(rawRecord)) - .collect(Collectors.toList())) - .build(); + private void flush() { + lock.lock(); + try { + if (recordBuffer.isEmpty()) { + return; + } else { + final int recordBufferCount = recordBuffer.size(); - StreamObserver streamObserver = - new StreamObserver<>() { - @Override - public void onNext(AppendResponse appendResponse) { - completableFuture.complete( - appendResponse.getRecordIdsList().stream() - .map(GrpcUtils::recordIdFromGrpc) - .collect(Collectors.toList())); - } + logger.info("start flush recordBuffer, current buffer size is: {}", recordBufferCount); - @Override - public void onError(Throwable t) { - logger.error("write rawRecord error", t); - completableFuture.completeExceptionally(t); - } + writeHStreamRecords(recordBuffer) + .thenAccept( + recordIds -> { + for (int i = 0; i < recordIds.size(); ++i) { + futures.get(i).complete(recordIds.get(i)); + } + }) + .join(); - @Override - public void onCompleted() {} - }; + recordBuffer.clear(); + futures.clear(); - grpcStub.append(appendRequest, streamObserver); + logger.info("finish clearing record buffer"); - return completableFuture; + semaphore.release(recordBufferCount); + } + } finally { + lock.unlock(); + } } - private CompletableFuture> writeHRecordsAsync(List hRecords) { + private CompletableFuture> writeHStreamRecords( + List hStreamRecords) { CompletableFuture> completableFuture = new CompletableFuture<>(); AppendRequest appendRequest = - AppendRequest.newBuilder() - .setStreamName(this.stream) - .addAllRecords( - hRecords.stream() - .map(hRecord -> RecordUtils.buildHStreamRecordFromHRecord(hRecord)) - .collect(Collectors.toList())) - .build(); + AppendRequest.newBuilder().setStreamName(stream).addAllRecords(hStreamRecords).build(); StreamObserver streamObserver = new StreamObserver<>() { @@ -202,52 +137,23 @@ public void onCompleted() {} return completableFuture; } - private void flushSync() { - lock.lock(); + private CompletableFuture addToBuffer(HStreamRecord hStreamRecord) { try { - if (recordBuffer.isEmpty()) { - return; - } else { - final int recordBufferCount = recordBuffer.size(); - - logger.info("start flush recordBuffer, current buffer size is: {}", recordBufferCount); - - List> rawRecords = - IntStream.range(0, recordBufferCount) - .filter(index -> recordBuffer.get(index) instanceof byte[]) - .mapToObj(index -> ImmutablePair.of(index, (byte[]) (recordBuffer.get(index)))) - .collect(Collectors.toList()); - - List> hRecords = - IntStream.range(0, recordBufferCount) - .filter(index -> recordBuffer.get(index) instanceof HRecord) - .mapToObj(index -> ImmutablePair.of(index, (HRecord) (recordBuffer.get(index)))) - .collect(Collectors.toList()); - - List rawRecordIds = - writeRawRecordsAsync( - rawRecords.stream().map(pair -> pair.getRight()).collect(Collectors.toList())) - .join(); - List hRecordIds = - writeHRecordsAsync( - hRecords.stream().map(ImmutablePair::getRight).collect(Collectors.toList())) - .join(); - - IntStream.range(0, rawRecords.size()) - .mapToObj(i -> ImmutablePair.of(i, rawRecords.get(i).getLeft())) - .forEach(p -> futures.get(p.getRight()).complete(rawRecordIds.get(p.getLeft()))); - - IntStream.range(0, hRecords.size()) - .mapToObj(i -> ImmutablePair.of(i, hRecords.get(i).getLeft())) - .forEach(p -> futures.get(p.getRight()).complete(hRecordIds.get(p.getLeft()))); - - recordBuffer.clear(); - futures.clear(); + semaphore.acquire(); + } catch (InterruptedException e) { + throw new HStreamDBClientException(e); + } - logger.info("finish clearing record buffer"); + lock.lock(); + try { + CompletableFuture completableFuture = new CompletableFuture<>(); + recordBuffer.add(hStreamRecord); + futures.add(completableFuture); - semaphore.release(recordBufferCount); + if (recordBuffer.size() == recordCountLimit) { + flush(); } + return completableFuture; } finally { lock.unlock(); } diff --git a/client/src/test/java/io/hstream/HStreamClientTest.java b/client/src/test/java/io/hstream/HStreamClientTest.java index 58d332fb..20cb693c 100644 --- a/client/src/test/java/io/hstream/HStreamClientTest.java +++ b/client/src/test/java/io/hstream/HStreamClientTest.java @@ -101,28 +101,25 @@ public void cleanUp() { @Test @Order(1) public void testWriteRawRecord() throws Exception { - CompletableFuture recordIdFuture = new CompletableFuture<>(); + Producer producer = client.newProducer().stream(testStreamName).build(); + Random random = new Random(); + byte[] rawRecord = new byte[100]; + random.nextBytes(rawRecord); + RecordId recordId = producer.write(rawRecord).join(); + Consumer consumer = client .newConsumer() .subscription(testSubscriptionId) .rawRecordReceiver( (receivedRawRecord, responder) -> { - recordIdFuture.thenAccept( - recordId -> - Assertions.assertEquals(recordId, receivedRawRecord.getRecordId())); + Assertions.assertEquals(recordId, receivedRawRecord.getRecordId()); + Assertions.assertEquals(rawRecord, receivedRawRecord.getRawRecord()); responder.ack(); }) .build(); consumer.startAsync().awaitRunning(); - Producer producer = client.newProducer().stream(testStreamName).build(); - Random random = new Random(); - byte[] rawRecord = new byte[100]; - random.nextBytes(rawRecord); - RecordId recordId = producer.write(rawRecord); - recordIdFuture.complete(recordId); - Thread.sleep(1000); consumer.stopAsync().awaitTerminated(); } @@ -134,7 +131,7 @@ public void testWriteHRecord() throws Exception { Producer producer = client.newProducer().stream(testStreamName).build(); HRecord hRecord = HRecord.newBuilder().put("key1", 10).put("key2", "hello").put("key3", true).build(); - RecordId recordId = producer.write(hRecord); + RecordId recordId = producer.write(hRecord).join(); CountDownLatch countDownLatch = new CountDownLatch(1); Consumer consumer = @@ -143,7 +140,6 @@ public void testWriteHRecord() throws Exception { .subscription(testSubscriptionId) .hRecordReceiver( (receivedHRecord, responder) -> { - logger.info("receivedHRecord: {}", receivedHRecord.getHRecord()); Assertions.assertEquals(recordId, receivedHRecord.getRecordId()); countDownLatch.countDown(); responder.ack(); @@ -167,7 +163,7 @@ public void testWriteBatchRawRecord() throws Exception { for (int i = 0; i < count; ++i) { byte[] rawRecord = new byte[100]; random.nextBytes(rawRecord); - CompletableFuture future = producer.writeAsync(rawRecord); + CompletableFuture future = producer.write(rawRecord); recordIdFutures[i] = future; } CompletableFuture.allOf(recordIdFutures).join(); @@ -185,10 +181,10 @@ public void testWriteBatchRawRecord() throws Exception { Assertions.assertEquals( recordIdFutures[index.getAndIncrement()].join(), receivedRawRecord.getRecordId()); - if (index.get() == count - 1) { + responder.ack(); + if (index.get() == count) { latch.countDown(); } - responder.ack(); }) .build(); consumer.startAsync().awaitRunning(); @@ -212,7 +208,7 @@ public void testWriteBatchRawRecordMultiThread() throws Exception { for (int i = 0; i < count / 2; ++i) { byte[] rawRecord = new byte[100]; random.nextBytes(rawRecord); - CompletableFuture future = producer.writeAsync(rawRecord); + CompletableFuture future = producer.write(rawRecord); recordIdFutures[i] = future; } }); @@ -223,7 +219,7 @@ public void testWriteBatchRawRecordMultiThread() throws Exception { for (int i = count / 2; i < count; ++i) { byte[] rawRecord = new byte[100]; random.nextBytes(rawRecord); - CompletableFuture future = producer.writeAsync(rawRecord); + CompletableFuture future = producer.write(rawRecord); recordIdFutures[i] = future; } }); @@ -231,44 +227,6 @@ public void testWriteBatchRawRecordMultiThread() throws Exception { thread1.start(); thread2.start(); - AtomicInteger readCount = new AtomicInteger(); - Consumer consumer = - client - .newConsumer() - .subscription(testSubscriptionId) - .rawRecordReceiver( - (receivedRawRecord, responder) -> { - readCount.incrementAndGet(); - responder.ack(); - }) - .build(); - consumer.startAsync().awaitRunning(); - - Thread.sleep(3000); - Assertions.assertEquals(count, readCount.get()); - consumer.stopAsync().awaitTerminated(); - } - - @Test - @Order(5) - public void testFlush() throws Exception { - Producer producer = - client.newProducer().stream(testStreamName).enableBatch().recordCountLimit(100).build(); - Random random = new Random(); - final int count = 10; - CompletableFuture[] recordIdFutures = new CompletableFuture[count]; - for (int i = 0; i < count; ++i) { - byte[] rawRecord = new byte[100]; - random.nextBytes(rawRecord); - CompletableFuture future = producer.writeAsync(rawRecord); - recordIdFutures[i] = future; - } - producer.flush(); - - // CompletableFuture.allOf(recordIdFutures).join(); - - logger.info("producer finish"); - CountDownLatch latch = new CountDownLatch(1); AtomicInteger index = new AtomicInteger(); Consumer consumer = @@ -280,10 +238,10 @@ public void testFlush() throws Exception { Assertions.assertEquals( recordIdFutures[index.getAndIncrement()].join(), receivedRawRecord.getRecordId()); - if (index.get() == count - 1) { + responder.ack(); + if (index.get() == count) { latch.countDown(); } - responder.ack(); }) .build(); consumer.startAsync().awaitRunning(); @@ -292,73 +250,21 @@ public void testFlush() throws Exception { consumer.stopAsync().awaitTerminated(); } - // @Disabled @Test - @Order(6) - public void testFlushMultiThread() throws Exception { - AtomicInteger readCount = new AtomicInteger(); - Consumer consumer = - client - .newConsumer() - .subscription(testSubscriptionId) - .rawRecordReceiver( - (receivedRawRecord, responder) -> { - readCount.incrementAndGet(); - responder.ack(); - }) - .build(); - consumer.startAsync().awaitRunning(); - - Producer producer = - client.newProducer().stream(testStreamName).enableBatch().recordCountLimit(100).build(); - Random random = new Random(); - final int count = 10; - - Thread thread1 = - new Thread( - () -> { - for (int i = 0; i < count; ++i) { - byte[] rawRecord = new byte[100]; - random.nextBytes(rawRecord); - producer.writeAsync(rawRecord); - } - producer.flush(); - }); - - Thread thread2 = - new Thread( - () -> { - for (int i = 0; i < count; ++i) { - byte[] rawRecord = new byte[100]; - random.nextBytes(rawRecord); - producer.writeAsync(rawRecord); - } - producer.flush(); - }); - - thread1.start(); - thread2.start(); - thread1.join(); - thread2.join(); - - Thread.sleep(5000); - consumer.stopAsync().awaitTerminated(); - Assertions.assertEquals(count * 2, readCount.get()); - } - - @Test - @Order(7) + @Order(5) public void testConsumerGroup() throws Exception { Producer producer = client.newProducer().stream(testStreamName).build(); Random random = new Random(); byte[] rawRecord = new byte[100]; - for (int i = 0; i < 9; ++i) { + final int count = 10; + for (int i = 0; i < count; ++i) { random.nextBytes(rawRecord); - producer.write(rawRecord); + producer.write(rawRecord).join(); } logger.info("write done"); + AtomicInteger readCount = new AtomicInteger(); Consumer consumer1 = client .newConsumer() @@ -367,6 +273,7 @@ public void testConsumerGroup() throws Exception { .rawRecordReceiver( (receivedRawRecord, responder) -> { logger.info("consumer-1 recv {}", receivedRawRecord.getRecordId().getBatchId()); + readCount.incrementAndGet(); responder.ack(); }) .build(); @@ -379,6 +286,7 @@ public void testConsumerGroup() throws Exception { .rawRecordReceiver( (receivedRawRecord, responder) -> { logger.info("consumer-2 recv {}", receivedRawRecord.getRecordId().getBatchId()); + readCount.incrementAndGet(); responder.ack(); }) .build(); @@ -391,6 +299,7 @@ public void testConsumerGroup() throws Exception { .rawRecordReceiver( (receivedRawRecord, responder) -> { logger.info("consumer-3 recv {}", receivedRawRecord.getRecordId().getBatchId()); + readCount.incrementAndGet(); responder.ack(); }) .build(); @@ -406,11 +315,13 @@ public void testConsumerGroup() throws Exception { consumer1.stopAsync().awaitTerminated(); consumer2.stopAsync().awaitTerminated(); consumer3.stopAsync().awaitTerminated(); + + Assertions.assertEquals(count, readCount.get()); } @Disabled("wait for fix HS-456") @Test - @Order(8) + @Order(6) public void testStreamQuery() throws Exception { AtomicInteger receivedCount = new AtomicInteger(0); Observer observer = @@ -460,7 +371,7 @@ public void onCompleted() {} } @Test - @Order(9) + @Order(7) public void testConsumerInTurn() throws Exception { final int recordCount = 10; Producer producer = client.newProducer().stream(testStreamName).build(); @@ -468,7 +379,7 @@ public void testConsumerInTurn() throws Exception { for (int i = 0; i < recordCount; ++i) { byte[] rawRecord = new byte[100]; random.nextBytes(rawRecord); - producer.write(rawRecord); + producer.write(rawRecord).join(); } final int maxReceivedCountC1 = 3; From 0b908705baacead8628977a70396a93a5d6579a0 Mon Sep 17 00:00:00 2001 From: wangbin Date: Mon, 11 Oct 2021 15:11:52 +0800 Subject: [PATCH 2/2] fix tests --- client/src/main/java/io/hstream/RecordId.java | 5 +++++ .../java/io/hstream/HStreamClientTest.java | 21 ++++++++++++------- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/client/src/main/java/io/hstream/RecordId.java b/client/src/main/java/io/hstream/RecordId.java index 69c32b24..518a6016 100644 --- a/client/src/main/java/io/hstream/RecordId.java +++ b/client/src/main/java/io/hstream/RecordId.java @@ -57,4 +57,9 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(batchId, batchIndex); } + + @Override + public String toString() { + return "RecordId{" + "batchId=" + batchId + ", batchIndex=" + batchIndex + '}'; + } } diff --git a/client/src/test/java/io/hstream/HStreamClientTest.java b/client/src/test/java/io/hstream/HStreamClientTest.java index 20cb693c..70fdd0fa 100644 --- a/client/src/test/java/io/hstream/HStreamClientTest.java +++ b/client/src/test/java/io/hstream/HStreamClientTest.java @@ -106,21 +106,25 @@ public void testWriteRawRecord() throws Exception { byte[] rawRecord = new byte[100]; random.nextBytes(rawRecord); RecordId recordId = producer.write(rawRecord).join(); + logger.info("write record: {}", recordId); + CountDownLatch latch = new CountDownLatch(1); Consumer consumer = client .newConsumer() .subscription(testSubscriptionId) .rawRecordReceiver( (receivedRawRecord, responder) -> { + logger.info("recv {}", receivedRawRecord.getRecordId()); Assertions.assertEquals(recordId, receivedRawRecord.getRecordId()); - Assertions.assertEquals(rawRecord, receivedRawRecord.getRawRecord()); + Assertions.assertArrayEquals(rawRecord, receivedRawRecord.getRawRecord()); responder.ack(); + latch.countDown(); }) .build(); consumer.startAsync().awaitRunning(); - Thread.sleep(1000); + latch.await(); consumer.stopAsync().awaitTerminated(); } @@ -141,8 +145,8 @@ public void testWriteHRecord() throws Exception { .hRecordReceiver( (receivedHRecord, responder) -> { Assertions.assertEquals(recordId, receivedHRecord.getRecordId()); - countDownLatch.countDown(); responder.ack(); + countDownLatch.countDown(); }) .build(); consumer.startAsync().awaitRunning(); @@ -227,19 +231,20 @@ public void testWriteBatchRawRecordMultiThread() throws Exception { thread1.start(); thread2.start(); + thread1.join(); + thread2.join(); + CountDownLatch latch = new CountDownLatch(1); - AtomicInteger index = new AtomicInteger(); + AtomicInteger readCount = new AtomicInteger(); Consumer consumer = client .newConsumer() .subscription(testSubscriptionId) .rawRecordReceiver( (receivedRawRecord, responder) -> { - Assertions.assertEquals( - recordIdFutures[index.getAndIncrement()].join(), - receivedRawRecord.getRecordId()); + readCount.incrementAndGet(); responder.ack(); - if (index.get() == count) { + if (readCount.get() == count) { latch.countDown(); } })