Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ plugins {
}

group = 'io.hstream'
version = '0.5.0'
version = '0.5.1-SNAPSHOT'

repositories {
mavenCentral()
Expand Down
37 changes: 8 additions & 29 deletions client/src/main/java/io/hstream/Producer.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,39 +6,18 @@
public interface Producer {

/**
* Sync method to generate a raw format message.
* Write a raw record.
*
* @param rawRecord raw format message.
* @return the {@link RecordId} of generated message.
* @param rawRecord raw format record.
* @return the {@link RecordId} wrapped in a {@link CompletableFuture}.
*/
RecordId write(byte[] rawRecord);
CompletableFuture<RecordId> write(byte[] rawRecord);

/**
* Sync method to generate a {@link HRecord} format message.
* Write a {@link HRecord}.
*
* @param hRecord HRecord format message.
* @return the {@link RecordId} of generated message.
* @param hRecord {@link HRecord}.
* @return the {@link RecordId} wrapped in a {@link CompletableFuture}.
*/
RecordId write(HRecord hRecord);

/**
* Async method to generate a raw format message.
*
* @param rawRecord raw format message.
* @return the {@link RecordId} of generated message which wrapped in a {@link CompletableFuture}
* object.
*/
CompletableFuture<RecordId> writeAsync(byte[] rawRecord);

/**
* Async method to generate a {@link HRecord} format message.
*
* @param hRecord HRecord format message.
* @return the {@link RecordId} of generated message which wrapped in a {@link CompletableFuture}
* object.
*/
CompletableFuture<RecordId> writeAsync(HRecord hRecord);

/** Flush buffed message. */
void flush();
CompletableFuture<RecordId> write(HRecord hRecord);
}
5 changes: 5 additions & 0 deletions client/src/main/java/io/hstream/RecordId.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,9 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(batchId, batchIndex);
}

@Override
public String toString() {
return "RecordId{" + "batchId=" + batchId + ", batchIndex=" + batchIndex + '}';
}
}
196 changes: 51 additions & 145 deletions client/src/main/java/io/hstream/impl/ProducerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.hstream.internal.AppendRequest;
import io.hstream.internal.AppendResponse;
import io.hstream.internal.HStreamApiGrpc;
import io.hstream.internal.HStreamRecord;
import io.hstream.util.GrpcUtils;
import io.hstream.util.RecordUtils;
import java.util.ArrayList;
Expand All @@ -14,8 +15,6 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -30,7 +29,7 @@ public class ProducerImpl implements Producer {

private final Semaphore semaphore;
private final Lock lock;
private final List<Object> recordBuffer;
private final List<HStreamRecord> recordBuffer;
private final List<CompletableFuture<RecordId>> futures;

public ProducerImpl(
Expand All @@ -57,126 +56,62 @@ public ProducerImpl(
}

@Override
public RecordId write(byte[] rawRecord) {
CompletableFuture<List<RecordId>> future = writeRawRecordsAsync(List.of(rawRecord));
logger.info("wait for write future");
return future.join().get(0);
public CompletableFuture<RecordId> write(byte[] rawRecord) {
HStreamRecord hStreamRecord = RecordUtils.buildHStreamRecordFromRawRecord(rawRecord);
return writeInternal(hStreamRecord);
}

@Override
public RecordId write(HRecord hRecord) {
CompletableFuture<List<RecordId>> future = writeHRecordsAsync(List.of(hRecord));
return future.join().get(0);
public CompletableFuture<RecordId> write(HRecord hRecord) {
HStreamRecord hStreamRecord = RecordUtils.buildHStreamRecordFromHRecord(hRecord);
return writeInternal(hStreamRecord);
}

@Override
public CompletableFuture<RecordId> writeAsync(byte[] rawRecord) {
private CompletableFuture<RecordId> writeInternal(HStreamRecord hStreamRecord) {
if (!enableBatch) {
return writeRawRecordsAsync(List.of(rawRecord)).thenApply(list -> list.get(0));
return writeHStreamRecords(List.of(hStreamRecord)).thenApply(recordIds -> recordIds.get(0));
} else {
try {
semaphore.acquire();
} catch (InterruptedException e) {
throw new HStreamDBClientException(e);
}

lock.lock();
try {
CompletableFuture<RecordId> completableFuture = new CompletableFuture<>();
recordBuffer.add(rawRecord);
futures.add(completableFuture);

if (recordBuffer.size() == recordCountLimit) {
flush();
}
return completableFuture;
} finally {
lock.unlock();
}
return addToBuffer(hStreamRecord);
}
}

@Override
public CompletableFuture<RecordId> writeAsync(HRecord hRecord) {
if (!enableBatch) {
return writeHRecordsAsync(List.of(hRecord)).thenApply(list -> list.get(0));
} else {
try {
semaphore.acquire();
} catch (InterruptedException e) {
throw new HStreamDBClientException(e);
}

lock.lock();
try {
CompletableFuture<RecordId> completableFuture = new CompletableFuture<>();
recordBuffer.add(hRecord);
futures.add(completableFuture);

if (recordBuffer.size() == recordCountLimit) {
flush();
}
return completableFuture;
} finally {
lock.unlock();
}
}
}

@Override
public void flush() {
flushSync();
}

private CompletableFuture<List<RecordId>> writeRawRecordsAsync(List<byte[]> rawRecords) {

CompletableFuture<List<RecordId>> completableFuture = new CompletableFuture<>();

AppendRequest appendRequest =
AppendRequest.newBuilder()
.setStreamName(this.stream)
.addAllRecords(
rawRecords.stream()
.map(rawRecord -> RecordUtils.buildHStreamRecordFromRawRecord(rawRecord))
.collect(Collectors.toList()))
.build();
private void flush() {
lock.lock();
try {
if (recordBuffer.isEmpty()) {
return;
} else {
final int recordBufferCount = recordBuffer.size();

StreamObserver<AppendResponse> streamObserver =
new StreamObserver<>() {
@Override
public void onNext(AppendResponse appendResponse) {
completableFuture.complete(
appendResponse.getRecordIdsList().stream()
.map(GrpcUtils::recordIdFromGrpc)
.collect(Collectors.toList()));
}
logger.info("start flush recordBuffer, current buffer size is: {}", recordBufferCount);

@Override
public void onError(Throwable t) {
logger.error("write rawRecord error", t);
completableFuture.completeExceptionally(t);
}
writeHStreamRecords(recordBuffer)
.thenAccept(
recordIds -> {
for (int i = 0; i < recordIds.size(); ++i) {
futures.get(i).complete(recordIds.get(i));
}
})
.join();

@Override
public void onCompleted() {}
};
recordBuffer.clear();
futures.clear();

grpcStub.append(appendRequest, streamObserver);
logger.info("finish clearing record buffer");

return completableFuture;
semaphore.release(recordBufferCount);
}
} finally {
lock.unlock();
}
}

private CompletableFuture<List<RecordId>> writeHRecordsAsync(List<HRecord> hRecords) {
private CompletableFuture<List<RecordId>> writeHStreamRecords(
List<HStreamRecord> hStreamRecords) {
CompletableFuture<List<RecordId>> completableFuture = new CompletableFuture<>();

AppendRequest appendRequest =
AppendRequest.newBuilder()
.setStreamName(this.stream)
.addAllRecords(
hRecords.stream()
.map(hRecord -> RecordUtils.buildHStreamRecordFromHRecord(hRecord))
.collect(Collectors.toList()))
.build();
AppendRequest.newBuilder().setStreamName(stream).addAllRecords(hStreamRecords).build();

StreamObserver<AppendResponse> streamObserver =
new StreamObserver<>() {
Expand All @@ -202,52 +137,23 @@ public void onCompleted() {}
return completableFuture;
}

private void flushSync() {
lock.lock();
private CompletableFuture<RecordId> addToBuffer(HStreamRecord hStreamRecord) {
try {
if (recordBuffer.isEmpty()) {
return;
} else {
final int recordBufferCount = recordBuffer.size();

logger.info("start flush recordBuffer, current buffer size is: {}", recordBufferCount);

List<ImmutablePair<Integer, byte[]>> rawRecords =
IntStream.range(0, recordBufferCount)
.filter(index -> recordBuffer.get(index) instanceof byte[])
.mapToObj(index -> ImmutablePair.of(index, (byte[]) (recordBuffer.get(index))))
.collect(Collectors.toList());

List<ImmutablePair<Integer, HRecord>> hRecords =
IntStream.range(0, recordBufferCount)
.filter(index -> recordBuffer.get(index) instanceof HRecord)
.mapToObj(index -> ImmutablePair.of(index, (HRecord) (recordBuffer.get(index))))
.collect(Collectors.toList());

List<RecordId> rawRecordIds =
writeRawRecordsAsync(
rawRecords.stream().map(pair -> pair.getRight()).collect(Collectors.toList()))
.join();
List<RecordId> hRecordIds =
writeHRecordsAsync(
hRecords.stream().map(ImmutablePair::getRight).collect(Collectors.toList()))
.join();

IntStream.range(0, rawRecords.size())
.mapToObj(i -> ImmutablePair.of(i, rawRecords.get(i).getLeft()))
.forEach(p -> futures.get(p.getRight()).complete(rawRecordIds.get(p.getLeft())));

IntStream.range(0, hRecords.size())
.mapToObj(i -> ImmutablePair.of(i, hRecords.get(i).getLeft()))
.forEach(p -> futures.get(p.getRight()).complete(hRecordIds.get(p.getLeft())));

recordBuffer.clear();
futures.clear();
semaphore.acquire();
} catch (InterruptedException e) {
throw new HStreamDBClientException(e);
}

logger.info("finish clearing record buffer");
lock.lock();
try {
CompletableFuture<RecordId> completableFuture = new CompletableFuture<>();
recordBuffer.add(hStreamRecord);
futures.add(completableFuture);

semaphore.release(recordBufferCount);
if (recordBuffer.size() == recordCountLimit) {
flush();
}
return completableFuture;
} finally {
lock.unlock();
}
Expand Down
Loading