Skip to content

Commit

Permalink
Merge 6bbca87 into 5a51ace
Browse files Browse the repository at this point in the history
  • Loading branch information
markflyhigh committed Dec 14, 2016
2 parents 5a51ace + 6bbca87 commit 960cf4a
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 60 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -123,7 +123,7 @@
<jackson.version>2.7.2</jackson.version>
<findbugs.version>3.0.1</findbugs.version>
<joda.version>2.4</joda.version>
<junit.version>4.11</junit.version>
<junit.version>4.12</junit.version>
<mockito.version>1.9.5</mockito.version>
<netty.version>4.1.3.Final</netty.version>
<os-maven-plugin.version>1.4.0.Final</os-maven-plugin.version>
Expand Down
Expand Up @@ -39,6 +39,7 @@
import com.google.common.hash.HashCode;
import com.google.common.hash.Hashing;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -117,20 +118,23 @@ protected boolean matchesSafely(PipelineResult pipelineResult) {

response = queryWithRetries(
bigqueryClient, queryContent, Sleeper.DEFAULT, BACKOFF_FACTORY.backoff());
} catch (Exception e) {
} catch (IOException | InterruptedException e) {
if (e instanceof InterruptedIOException) {
Thread.currentThread().interrupt();
}
throw new RuntimeException("Failed to fetch BigQuery data.", e);
}

// validate BigQuery response
if (response == null || response.getRows() == null || response.getRows().isEmpty()) {
if (!response.getJobComplete()) {
// query job not complete, verification failed
return false;
}

// compute checksum
actualChecksum = generateHash(response.getRows());
LOG.debug("Generated a SHA1 checksum based on queried data: {}", actualChecksum);
} else {
// compute checksum
actualChecksum = generateHash(response.getRows());
LOG.debug("Generated a SHA1 checksum based on queried data: {}", actualChecksum);

return expectedChecksum.equals(actualChecksum);
return expectedChecksum.equals(actualChecksum);
}
}

@VisibleForTesting
Expand All @@ -144,23 +148,35 @@ Bigquery newBigqueryClient(String applicationName) {
.build();
}

@Nonnull
@VisibleForTesting
QueryResponse queryWithRetries(Bigquery bigqueryClient, QueryRequest queryContent,
Sleeper sleeper, BackOff backOff)
throws IOException, InterruptedException {
IOException lastException = null;
do {
if (lastException != null) {
LOG.warn("Retrying query ({}) after exception", queryContent.getQuery(), lastException);
}
try {
return bigqueryClient.jobs().query(projectId, queryContent).execute();
QueryResponse response = bigqueryClient.jobs().query(projectId, queryContent).execute();
if (response != null) {
return response;
} else {
lastException =
new IOException("Expected valid response from query job, but received null.");
}
} catch (IOException e) {
// ignore and retry
LOG.warn("Ignore the error and retry the query.");
lastException = e;
}
} while(BackOffUtils.next(sleeper, backOff));
throw new IOException(

throw new RuntimeException(
String.format(
"Unable to get BigQuery response after retrying %d times", MAX_QUERY_RETRIES),
"Unable to get BigQuery response after retrying %d times using query (%s)",
MAX_QUERY_RETRIES,
queryContent.getQuery()),
lastException);
}

Expand Down Expand Up @@ -210,9 +226,9 @@ public void describeTo(Description description) {
@Override
public void describeMismatchSafely(PipelineResult pResult, Description description) {
String info;
if (response == null || response.getRows() == null || response.getRows().isEmpty()) {
// invalid query response
info = String.format("Invalid BigQuery response: %s", Objects.toString(response));
if (!response.getJobComplete()) {
// query job not complete
info = String.format("The query job hasn't completed. Got response: %s", response);
} else {
// checksum mismatch
info = String.format("was (%s).%n"
Expand Down
Expand Up @@ -18,14 +18,12 @@
package org.apache.beam.sdk.testing;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -90,52 +88,33 @@ public void testBigqueryMatcherFailsForChecksumMismatch() throws IOException {
doReturn(mockBigqueryClient).when(matcher).newBigqueryClient(anyString());
when(mockQuery.execute()).thenReturn(createResponseContainingTestData());

thrown.expect(AssertionError.class);
thrown.expectMessage("Total number of rows are: 1");
thrown.expectMessage("abc");
try {
assertThat(mockResult, matcher);
} catch (AssertionError expected) {
assertThat(expected.getMessage(), containsString("Total number of rows are: 1"));
assertThat(expected.getMessage(), containsString("abc"));
} finally {
verify(matcher).newBigqueryClient(eq(appName));
verify(mockJobs).query(eq(projectId), eq(new QueryRequest().setQuery(query)));
}
}

@Test
public void testBigqueryMatcherFailsWhenResponseIsNull() throws IOException {
testMatcherFailsSinceInvalidQueryResponse(null);
}

@Test
public void testBigqueryMatcherFailsWhenNullRowsInResponse() throws IOException {
testMatcherFailsSinceInvalidQueryResponse(new QueryResponse());
}

@Test
public void testBigqueryMatcherFailsWhenEmptyRowsInResponse() throws IOException {
QueryResponse response = new QueryResponse();
response.setRows(Lists.<TableRow>newArrayList());

testMatcherFailsSinceInvalidQueryResponse(response);
}

private void testMatcherFailsSinceInvalidQueryResponse(QueryResponse response)
throws IOException {
public void testBigqueryMatcherFailsWhenQueryJobNotComplete() throws Exception {
BigqueryMatcher matcher = spy(
new BigqueryMatcher(appName, projectId, query, "some-checksum"));
doReturn(mockBigqueryClient).when(matcher).newBigqueryClient(anyString());
when(mockQuery.execute()).thenReturn(response);
when(mockQuery.execute()).thenReturn(new QueryResponse().setJobComplete(false));

thrown.expect(AssertionError.class);
thrown.expectMessage("The query job hasn't completed.");
thrown.expectMessage("jobComplete=false");
try {
assertThat(mockResult, matcher);
} catch (AssertionError expected) {
assertThat(expected.getMessage(), containsString("Invalid BigQuery response:"));
} finally {
verify(matcher).newBigqueryClient(eq(appName));
verify(mockJobs).query(eq(projectId), eq(new QueryRequest().setQuery(query)));
return;
}
// Note that fail throws an AssertionError which is why it is placed out here
// instead of inside the try-catch block.
fail("AssertionError is expected.");
}

@Test
Expand All @@ -144,18 +123,38 @@ public void testQueryWithRetriesWhenServiceFails() throws Exception {
new BigqueryMatcher(appName, projectId, query, "some-checksum"));
when(mockQuery.execute()).thenThrow(new IOException());

thrown.expect(IOException.class);
thrown.expect(RuntimeException.class);
thrown.expectMessage("Unable to get BigQuery response after retrying");
try {
matcher.queryWithRetries(
mockBigqueryClient,
new QueryRequest(),
fastClock,
BigqueryMatcher.BACKOFF_FACTORY.backoff());
} finally {
verify(mockJobs, atLeast(BigqueryMatcher.MAX_QUERY_RETRIES))
.query(eq(projectId), eq(new QueryRequest()));
}
}

matcher.queryWithRetries(
mockBigqueryClient,
new QueryRequest(),
fastClock,
BigqueryMatcher.BACKOFF_FACTORY.backoff());
@Test
public void testQueryWithRetriesWhenQueryResponseNull() throws Exception {
BigqueryMatcher matcher = spy(
new BigqueryMatcher(appName, projectId, query, "some-checksum"));
when(mockQuery.execute()).thenReturn(null);

verify(matcher).newBigqueryClient(eq(appName));
verify(mockJobs, times(BigqueryMatcher.MAX_QUERY_RETRIES))
.query(eq(projectId), eq(new QueryRequest().setQuery(query)));
thrown.expect(RuntimeException.class);
thrown.expectMessage("Unable to get BigQuery response after retrying");
try {
matcher.queryWithRetries(
mockBigqueryClient,
new QueryRequest(),
fastClock,
BigqueryMatcher.BACKOFF_FACTORY.backoff());
} finally {
verify(mockJobs, atLeast(BigqueryMatcher.MAX_QUERY_RETRIES))
.query(eq(projectId), eq(new QueryRequest()));
}
}

private QueryResponse createResponseContainingTestData() {
Expand All @@ -169,6 +168,7 @@ private QueryResponse createResponseContainingTestData() {
row.setF(Lists.newArrayList(field1, field2, field3));

QueryResponse response = new QueryResponse();
response.setJobComplete(true);
response.setRows(Lists.newArrayList(row));
response.setTotalRows(BigInteger.ONE);
return response;
Expand Down
Expand Up @@ -282,7 +282,7 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<version>4.12</version>
</dependency>

<dependency>
Expand Down
Expand Up @@ -328,7 +328,7 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<version>4.12</version>
</dependency>

<!-- The DirectRunner is needed for unit tests. -->
Expand Down

0 comments on commit 960cf4a

Please sign in to comment.