Skip to content
Permalink
Browse files
fix: flaky writeapi manual client tests (#238)
1. There are some warnings in the test runs saying that open connections are not closed. Make sure everything is shutdown after test.
2. There is some unexpected exceptions thrown which is not caught. Now catch a more general exception and also fix some issues that incorrectly calling remove on List (which is not supported).
3. Make sure the executor in the tests are finished running, so it will not run into a race with test shutdown.
  • Loading branch information
yirutang committed May 1, 2020
1 parent e0b0fcd commit 89c8623e082cacdc8e0843bffb67da4dc8b79df3
@@ -102,4 +102,9 @@ public static void testSetStub(
BigQueryWriteClient stub, int maxTableEntry, SchemaCompact schemaCheck) {
cache = WriterCache.getTestInstance(stub, maxTableEntry, schemaCheck);
}

/** Clears the underlying cache and all the transport connections. */
public static void clearCache() {
cache.clear();
}
}
@@ -20,6 +20,7 @@
import com.google.common.cache.CacheBuilder;
import com.google.protobuf.Descriptors.Descriptor;
import java.io.IOException;
import java.util.concurrent.ConcurrentMap;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -144,6 +145,22 @@ public StreamWriter getTableWriter(String tableName, Descriptor userSchema)
return writer;
}

/** Clear the cache and close all the writers in the cache. */
public void clear() {
synchronized (this) {
ConcurrentMap<String, Cache<Descriptor, StreamWriter>> map = writerCache.asMap();
for (String key : map.keySet()) {
Cache<Descriptor, StreamWriter> entry = writerCache.getIfPresent(key);
ConcurrentMap<Descriptor, StreamWriter> entryMap = entry.asMap();
for (Descriptor descriptor : entryMap.keySet()) {
StreamWriter writer = entry.getIfPresent(descriptor);
writer.close();
}
}
writerCache.cleanUp();
}
}

@VisibleForTesting
public long cachedTableCount() {
synchronized (writerCache) {
@@ -24,15 +24,15 @@
import com.google.api.gax.grpc.testing.MockGrpcService;
import com.google.api.gax.grpc.testing.MockServiceHelper;
import com.google.cloud.bigquery.storage.test.Test.*;
import com.google.common.collect.Sets;
import com.google.protobuf.AbstractMessage;
import com.google.protobuf.Timestamp;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import org.junit.*;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -42,6 +42,8 @@

@RunWith(JUnit4.class)
public class DirectWriterTest {
private static final Logger LOG = Logger.getLogger(DirectWriterTest.class.getName());

private static final String TEST_TABLE = "projects/p/datasets/d/tables/t";
private static final String TEST_STREAM = "projects/p/datasets/d/tables/t/streams/s";
private static final String TEST_STREAM_2 = "projects/p/datasets/d/tables/t/streams/s2";
@@ -86,7 +88,7 @@ public void tearDown() throws Exception {
}

/** Response mocks for create a new writer */
void WriterCreationResponseMock(String testStreamName, List<Long> responseOffsets) {
void WriterCreationResponseMock(String testStreamName, Set<Long> responseOffsets) {
// Response from CreateWriteStream
Stream.WriteStream expectedResponse =
Stream.WriteStream.newBuilder().setName(testStreamName).build();
@@ -117,7 +119,7 @@ public void testWriteSuccess() throws Exception {
FooType m1 = FooType.newBuilder().setFoo("m1").build();
FooType m2 = FooType.newBuilder().setFoo("m2").build();

WriterCreationResponseMock(TEST_STREAM, Arrays.asList(Long.valueOf(0L)));
WriterCreationResponseMock(TEST_STREAM, Sets.newHashSet(Long.valueOf(0L)));
ApiFuture<Long> ret = DirectWriter.<FooType>append(TEST_TABLE, Arrays.asList(m1, m2));
verify(schemaCheck).check(TEST_TABLE, FooType.getDescriptor());
assertEquals(Long.valueOf(0L), ret.get());
@@ -159,7 +161,7 @@ public void testWriteSuccess() throws Exception {
assertEquals(expectRequest.toString(), actualRequests.get(3).toString());

// Write with a different schema.
WriterCreationResponseMock(TEST_STREAM_2, Arrays.asList(Long.valueOf(0L)));
WriterCreationResponseMock(TEST_STREAM_2, Sets.newHashSet(Long.valueOf(0L)));
AllSupportedTypes m3 = AllSupportedTypes.newBuilder().setStringValue("s").build();
ret = DirectWriter.<AllSupportedTypes>append(TEST_TABLE, Arrays.asList(m3));
verify(schemaCheck).check(TEST_TABLE, AllSupportedTypes.getDescriptor());
@@ -181,6 +183,8 @@ public void testWriteSuccess() throws Exception {
((Storage.CreateWriteStreamRequest) actualRequests.get(4)).getWriteStream().getType());
assertEquals(TEST_STREAM_2, ((Storage.GetWriteStreamRequest) actualRequests.get(5)).getName());
assertEquals(expectRequest.toString(), actualRequests.get(6).toString());

DirectWriter.clearCache();
}

@Test
@@ -195,15 +199,17 @@ public void testWriteBadTableName() throws Exception {
} catch (IllegalArgumentException expected) {
assertEquals("Invalid table name: abc", expected.getMessage());
}

DirectWriter.clearCache();
}

@Test
public void testConcurrentAccess() throws Exception {
WriterCache cache = WriterCache.getTestInstance(client, 2, schemaCheck);
DirectWriter.testSetStub(client, 2, schemaCheck);
final FooType m1 = FooType.newBuilder().setFoo("m1").build();
final FooType m2 = FooType.newBuilder().setFoo("m2").build();
final List<Long> expectedOffset =
Arrays.asList(
final Set<Long> expectedOffset =
Sets.newHashSet(
Long.valueOf(0L),
Long.valueOf(2L),
Long.valueOf(4L),
@@ -221,12 +227,21 @@ public void run() {
try {
ApiFuture<Long> result =
DirectWriter.<FooType>append(TEST_TABLE, Arrays.asList(m1, m2));
assertTrue(expectedOffset.remove(result.get()));
} catch (IOException | InterruptedException | ExecutionException e) {
fail(e.getMessage());
synchronized (expectedOffset) {
assertTrue(expectedOffset.remove(result.get()));
}
} catch (Exception e) {
fail(e.toString());
}
}
});
}
executor.shutdown();
try {
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
LOG.info(e.toString());
}
DirectWriter.clearCache();
}
}
@@ -135,8 +135,9 @@ private ApiFuture<AppendRowsResponse> sendTestMessage(StreamWriter writer, Strin

@Test
public void testTableName() throws Exception {
StreamWriter writer = getTestStreamWriterBuilder().build();
assertEquals("projects/p/datasets/d/tables/t", writer.getTableNameString());
try (StreamWriter writer = getTestStreamWriterBuilder().build()) {
assertEquals("projects/p/datasets/d/tables/t", writer.getTableNameString());
}
}

@Test
@@ -175,7 +176,7 @@ public void testAppendByDuration() throws Exception {
.getSerializedRowsCount());
assertEquals(
true, testBigQueryWrite.getAppendRequests().get(0).getProtoRows().hasWriterSchema());
writer.shutdown();
writer.close();
}

@Test
@@ -228,7 +229,7 @@ public void testAppendByNumBatchedMessages() throws Exception {
.getSerializedRowsCount());
assertEquals(
false, testBigQueryWrite.getAppendRequests().get(1).getProtoRows().hasWriterSchema());
writer.shutdown();
writer.close();
}

@Test
@@ -264,7 +265,7 @@ public void testAppendByNumBytes() throws Exception {

assertEquals(3, testBigQueryWrite.getAppendRequests().size());

writer.shutdown();
writer.close();
}

@Test
@@ -429,7 +430,8 @@ public void testFlowControlBehaviorException() throws Exception {
try {
appendFuture2.get();
Assert.fail("This should fail");
} catch (ExecutionException e) {
} catch (Exception e) {
LOG.info("ControlFlow test exception: " + e.toString());
assertEquals("The maximum number of batch elements: 1 have been reached.", e.getMessage());
}
assertEquals(1L, appendFuture1.get().getOffset());
@@ -143,6 +143,7 @@ public void testCreateNewWriter() throws Exception {
assertEquals(TEST_TABLE, writer.getTableNameString());
assertEquals(TEST_STREAM, writer.getStreamNameString());
assertEquals(1, cache.cachedTableCount());
cache.clear();
}

@Test
@@ -173,6 +174,7 @@ public void testWriterExpired() throws Exception {
"Cannot write to a stream that is already expired: projects/p/datasets/d/tables/t/streams/s",
e.getMessage());
}
cache.clear();
}

@Test
@@ -216,6 +218,7 @@ public void testWriterWithNewSchema() throws Exception {
assertEquals(TEST_STREAM_3, writer4.getStreamNameString());
assertEquals(TEST_STREAM_4, writer5.getStreamNameString());
assertEquals(1, cache.cachedTableCount());
cache.clear();
}

@Test
@@ -259,6 +262,7 @@ public void testWriterWithDifferentTable() throws Exception {
assertEquals(TEST_STREAM_31, writer4.getStreamNameString());
assertEquals(TEST_STREAM, writer5.getStreamNameString());
assertEquals(2, cache.cachedTableCount());
cache.clear();
}

@Test
@@ -275,7 +279,7 @@ public void testConcurrentAccess() throws Exception {
public void run() {
try {
assertTrue(cache.getTableWriter(TEST_TABLE, FooType.getDescriptor()) != null);
} catch (IOException | InterruptedException e) {
} catch (Exception e) {
fail(e.getMessage());
}
}
@@ -391,5 +391,12 @@ public Long call() throws IOException, InterruptedException, ExecutionException
assertTrue(expectedOffset.remove(response.get()));
}
assertTrue(expectedOffset.isEmpty());
executor.shutdown();
try {
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
LOG.info(e.toString());
}
DirectWriter.clearCache();
}
}

0 comments on commit 89c8623

Please sign in to comment.