Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import static org.apache.beam.runners.dataflow.util.Structs.getBytes;
import static org.apache.beam.runners.dataflow.util.Structs.getString;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import com.google.auto.service.AutoService;
import java.io.IOException;
Expand Down Expand Up @@ -156,10 +155,10 @@ private PubsubWriter(String topic) {

@Override
public long add(WindowedValue<T> data) throws IOException {
checkState(
stream.size() == 0,
"Expected output stream to be empty but had %s",
stream.toByteString());
if (!stream.isEmpty()) {
throw new IllegalStateException(
"Expected output stream to be empty but was of size " + stream.size());
}
ByteString byteString = null;
try {
if (formatFn != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ private WindmillStreamWriter(String destinationName) {
}

private <EncodeT> ByteString encode(Coder<EncodeT> coder, EncodeT object) throws IOException {
if (stream.size() != 0) {
if (!stream.isEmpty()) {
throw new IllegalStateException(
"Expected output stream to be empty but had " + stream.toByteString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ public void discard() {
private Elements.Builder convertBufferForTransmission() {
Elements.Builder bufferedElements = Elements.newBuilder();
for (Map.Entry<String, Receiver<?>> entry : outputDataReceivers.entrySet()) {
if (entry.getValue().bufferedSize() == 0) {
if (!entry.getValue().hasBufferedOutput()) {
continue;
}
ByteString bytes = entry.getValue().toByteStringAndResetBuffer();
Expand All @@ -248,7 +248,7 @@ private Elements.Builder convertBufferForTransmission() {
.setData(bytes);
}
for (Map.Entry<TimerEndpoint, Receiver<?>> entry : outputTimersReceivers.entrySet()) {
if (entry.getValue().bufferedSize() == 0) {
if (!entry.getValue().hasBufferedOutput()) {
continue;
}
ByteString bytes = entry.getValue().toByteStringAndResetBuffer();
Expand Down Expand Up @@ -340,10 +340,11 @@ public Receiver(Coder<T> coder) {
public void accept(T input) throws Exception {
int size = output.size();
coder.encode(input, output);
if (output.size() - size == 0) {
long delta = (long) output.size() - size;
if (delta == 0) {
output.write(0);
delta = 1;
}
final long delta = (long) output.size() - size;
bytesWrittenSinceFlush += delta;
perBundleByteCount += delta;
perBundleElementCount += 1;
Expand All @@ -360,8 +361,8 @@ public long getElementCount() {
return perBundleElementCount;
}

public int bufferedSize() {
return output.size();
public boolean hasBufferedOutput() {
return !output.isEmpty();
}

public ByteString toByteStringAndResetBuffer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,8 @@ public ByteString toByteStringAndReset() {
* Resets the output stream to be re-used possibly re-using any existing buffers.
*/
public void reset() {
if (size() == 0) {
return;
}
toByteStringAndReset();
bufferPos = 0;
result = ByteString.EMPTY;
}

/**
Expand Down Expand Up @@ -216,6 +214,12 @@ public int size() {
return result.size() + bufferPos;
}

/** Returns if the output stream is currently empty. */
@SuppressWarnings("ReferenceEquality")
public boolean isEmpty() {
return bufferPos == 0 && result == ByteString.EMPTY;
}

@Override
public Appendable append(@Nullable CharSequence csq) throws IOException {
write(Preconditions.checkNotNull(csq).toString().getBytes(StandardCharsets.UTF_8));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.hamcrest.Matchers.empty;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;

import java.io.IOException;
Expand Down Expand Up @@ -146,7 +147,7 @@ public void testConfiguredBufferLimit() throws Exception {
} else {
receiver = Iterables.getOnlyElement(aggregator.outputDataReceivers.values());
}
assertEquals(0L, receiver.bufferedSize());
assertFalse(receiver.hasBufferedOutput());
assertEquals(102L, receiver.getByteCount());
assertEquals(2L, receiver.getElementCount());

Expand All @@ -156,7 +157,7 @@ public void testConfiguredBufferLimit() throws Exception {
aggregator.sendOrCollectBufferedDataAndFinishOutboundStreams();
// Test that receiver stats have been reset after
// sendOrCollectBufferedDataAndFinishOutboundStreams.
assertEquals(0L, receiver.bufferedSize());
assertFalse(receiver.hasBufferedOutput());
assertEquals(0L, receiver.getByteCount());
assertEquals(0L, receiver.getElementCount());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
Expand Down Expand Up @@ -145,6 +146,7 @@ public void testWriteBytesWithZeroInitialCapacity() throws Exception {

ByteStringOutputStream out = new ByteStringOutputStream(0);
assertEquals(0, out.size());
assertTrue(out.isEmpty());

for (int pos = 0; pos < testBuffer.length; ) {
if (testBuffer[pos] == 0) {
Expand All @@ -157,8 +159,14 @@ public void testWriteBytesWithZeroInitialCapacity() throws Exception {
}
assertEquals(pos, out.size());
}
assertEquals(testBuffer.length == 0, out.isEmpty());
assertEquals(testBuffer.length, out.size());
assertEquals(UnsafeByteOperations.unsafeWrap(testBuffer), out.toByteString());
assertEquals(testBuffer.length == 0, out.isEmpty());
assertEquals(testBuffer.length, out.size());
assertEquals(UnsafeByteOperations.unsafeWrap(testBuffer), out.toByteStringAndReset());
assertTrue(out.isEmpty());
assertEquals(0, out.size());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public void asyncClose() throws Exception {
.setAppend(StateAppendRequest.newBuilder().setData(out.toByteStringAndReset())));
}
}
if (out.size() > 0) {
if (!out.isEmpty()) {
beamFnStateClient.handle(
request
.toBuilder()
Expand Down
Loading