Skip to content

Commit

Permalink
fix: Temporally disable test/code where breaking change is used to he…
Browse files Browse the repository at this point in the history
…lp pushing out the breaking change in unreleased Beta (#727)

* fix: a race condition in test

* .

* .

* .

* resolve merge conflict

Co-authored-by: Stephanie Wang <stephaniewang526@users.noreply.github.com>
  • Loading branch information
yirutang and stephaniewang526 committed Dec 14, 2020
1 parent ab6213c commit 38c95c2
Show file tree
Hide file tree
Showing 4 changed files with 224 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -857,18 +857,19 @@ public void onResponse(AppendRowsResponse response) {
inflightBatch.onFailure(exception);
}
}
if (inflightBatch.getExpectedOffset() > 0
&& response.getOffset() != inflightBatch.getExpectedOffset()) {
IllegalStateException exception =
new IllegalStateException(
String.format(
"The append result offset %s does not match " + "the expected offset %s.",
response.getOffset(), inflightBatch.getExpectedOffset()));
inflightBatch.onFailure(exception);
abortInflightRequests(exception);
} else {
inflightBatch.onSuccess(response);
}
// Temp for Breaking Change.
// if (inflightBatch.getExpectedOffset() > 0
// && response.getOffset() != inflightBatch.getExpectedOffset()) {
// IllegalStateException exception =
// new IllegalStateException(
// String.format(
// "The append result offset %s does not match " + "the expected offset %s.",
// response.getOffset(), inflightBatch.getExpectedOffset()));
// inflightBatch.onFailure(exception);
// abortInflightRequests(exception);
// } else {
inflightBatch.onSuccess(response);
// }
} finally {
streamWriter.messagesWaiter.release(inflightBatch.getByteSize());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,12 +244,15 @@ public void testSingleAppendSimpleJson() throws Exception {
try (JsonStreamWriter writer =
getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) {

testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build());
// Temp for Breaking Change.
// testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());

ApiFuture<AppendRowsResponse> appendFuture =
writer.append(jsonArr, -1, /* allowUnknownFields */ false);

assertEquals(0L, appendFuture.get().getOffset());
// Temp for Breaking Change.
// assertEquals(0L, appendFuture.get().getOffset());
appendFuture.get();
assertEquals(
1,
testBigQueryWrite
Expand Down Expand Up @@ -288,12 +291,16 @@ public void testSingleAppendMultipleSimpleJson() throws Exception {

try (JsonStreamWriter writer =
getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) {
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build());
// Temp for Breaking Change.
// testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());

ApiFuture<AppendRowsResponse> appendFuture =
writer.append(jsonArr, -1, /* allowUnknownFields */ false);

assertEquals(0L, appendFuture.get().getOffset());
// Temp for Breaking Change.
// assertEquals(0L, appendFuture.get().getOffset());
appendFuture.get();
assertEquals(
4,
testBigQueryWrite
Expand Down Expand Up @@ -325,15 +332,21 @@ public void testMultipleAppendSimpleJson() throws Exception {

try (JsonStreamWriter writer =
getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) {
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(1).build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(3).build());
// Temp for Breaking Change.
// testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build());
// testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(1).build());
// testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build());
// testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(3).build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());
ApiFuture<AppendRowsResponse> appendFuture;
for (int i = 0; i < 4; i++) {
appendFuture = writer.append(jsonArr, -1, /* allowUnknownFields */ false);

assertEquals((long) i, appendFuture.get().getOffset());
// Temp for Breaking Change.
// assertEquals((long) i, appendFuture.get().getOffset());
appendFuture.get();
assertEquals(
1,
testBigQueryWrite
Expand Down Expand Up @@ -411,11 +424,16 @@ public void testSingleAppendComplexJson() throws Exception {

try (JsonStreamWriter writer =
getTestJsonStreamWriterBuilder(TEST_STREAM, COMPLEX_TABLE_SCHEMA).build()) {
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build());
// Temp for Breaking Change.
// testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());

ApiFuture<AppendRowsResponse> appendFuture =
writer.append(jsonArr, -1, /* allowUnknownFields */ false);

assertEquals(0L, appendFuture.get().getOffset());
// Temp for Breaking Change.
// assertEquals(0L, appendFuture.get().getOffset());
appendFuture.get();
assertEquals(
1,
testBigQueryWrite
Expand All @@ -440,17 +458,19 @@ public void testAppendMultipleSchemaUpdate() throws Exception {
try (JsonStreamWriter writer =
getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) {
// Add fake resposne for FakeBigQueryWrite, first response has updated schema.
// Temp for Breaking Change.
testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setOffset(0)
// .setOffset(0)
.setUpdatedSchema(UPDATED_TABLE_SCHEMA)
.build());
testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setOffset(1)
// .setOffset(1)
.setUpdatedSchema(UPDATED_TABLE_SCHEMA_2)
.build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build());
// testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());
// First append
JSONObject foo = new JSONObject();
foo.put("foo", "allen");
Expand All @@ -469,7 +489,9 @@ public void testAppendMultipleSchemaUpdate() throws Exception {
millis += 100;
}
assertTrue(writer.getDescriptor().getFields().size() == 2);
assertEquals(0L, appendFuture1.get().getOffset());
// Temp for Breaking Change.
// assertEquals(0L, appendFuture1.get().getOffset());
appendFuture1.get();
assertEquals(
1,
testBigQueryWrite
Expand Down Expand Up @@ -506,7 +528,9 @@ public void testAppendMultipleSchemaUpdate() throws Exception {
millis += 100;
}
assertTrue(writer.getDescriptor().getFields().size() == 3);
assertEquals(1L, appendFuture2.get().getOffset());
// Temp for Breaking Change.
// assertEquals(1L, appendFuture2.get().getOffset());
appendFuture2.get();
assertEquals(
1,
testBigQueryWrite
Expand Down Expand Up @@ -535,7 +559,9 @@ public void testAppendMultipleSchemaUpdate() throws Exception {
ApiFuture<AppendRowsResponse> appendFuture3 =
writer.append(updatedJsonArr2, -1, /* allowUnknownFields */ false);

assertEquals(2L, appendFuture3.get().getOffset());
// Temp for Breaking Change.
// assertEquals(2L, appendFuture3.get().getOffset());
appendFuture3.get();
assertEquals(
1,
testBigQueryWrite
Expand Down Expand Up @@ -616,7 +642,9 @@ public void testAppendOutOfRangeAndUpdateSchema() throws Exception {
.setError(com.google.rpc.Status.newBuilder().setCode(11).build())
.setUpdatedSchema(UPDATED_TABLE_SCHEMA)
.build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build());
// Temp for Breaking Change.
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());
// testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build());

JSONObject foo = new JSONObject();
foo.put("foo", "allen");
Expand Down Expand Up @@ -648,8 +676,9 @@ public void testAppendOutOfRangeAndUpdateSchema() throws Exception {

ApiFuture<AppendRowsResponse> appendFuture2 =
writer.append(updatedJsonArr, -1, /* allowUnknownFields */ false);

assertEquals(0L, appendFuture2.get().getOffset());
// Temp for Breaking Change.
// assertEquals(0L, appendFuture2.get().getOffset());
appendFuture2.get();
assertEquals(
1,
testBigQueryWrite
Expand Down Expand Up @@ -683,13 +712,16 @@ public void testSchemaUpdateWithNonemptyBatch() throws Exception {
.setElementCountThreshold(2L)
.build())
.build()) {
// Temp for Breaking Change.
testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setOffset(0)
// .setOffset(0)
.setUpdatedSchema(UPDATED_TABLE_SCHEMA)
.build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(3).build());
// testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build());
// testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(3).build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());
// First append
JSONObject foo = new JSONObject();
foo.put("foo", "allen");
Expand All @@ -703,8 +735,11 @@ public void testSchemaUpdateWithNonemptyBatch() throws Exception {
ApiFuture<AppendRowsResponse> appendFuture3 =
writer.append(jsonArr, -1, /* allowUnknownFields */ false);

assertEquals(0L, appendFuture1.get().getOffset());
assertEquals(1L, appendFuture2.get().getOffset());
// Temp for Breaking Change.
// assertEquals(0L, appendFuture1.get().getOffset());
// assertEquals(1L, appendFuture2.get().getOffset());
appendFuture1.get();
appendFuture2.get();
assertEquals(
2,
testBigQueryWrite
Expand All @@ -730,7 +765,9 @@ public void testSchemaUpdateWithNonemptyBatch() throws Exception {
.getSerializedRows(1),
FooType.newBuilder().setFoo("allen").build().toByteString());

assertEquals(2L, appendFuture3.get().getOffset());
// Temp for Breaking Change.
// assertEquals(2L, appendFuture3.get().getOffset());
appendFuture3.get();
assertEquals(
1,
testBigQueryWrite
Expand Down Expand Up @@ -768,7 +805,9 @@ public void testSchemaUpdateWithNonemptyBatch() throws Exception {
ApiFuture<AppendRowsResponse> appendFuture4 =
writer.append(updatedJsonArr, -1, /* allowUnknownFields */ false);

assertEquals(3L, appendFuture4.get().getOffset());
// Temp for Breaking Change.
// assertEquals(3L, appendFuture4.get().getOffset());
appendFuture4.get();
assertEquals(
1,
testBigQueryWrite
Expand Down Expand Up @@ -813,7 +852,10 @@ public void testMultiThreadAppendNoSchemaUpdate() throws Exception {
int thread_nums = 5;
Thread[] thread_arr = new Thread[thread_nums];
for (int i = 0; i < thread_nums; i++) {
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset((long) i).build());
// Temp for Breaking Change.
// testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset((long)
// i).build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());
offsetSets.add((long) i);
Thread t =
new Thread(
Expand All @@ -823,8 +865,9 @@ public void run() {
ApiFuture<AppendRowsResponse> appendFuture =
writer.append(jsonArr, -1, /* allowUnknownFields */ false);
AppendRowsResponse response = appendFuture.get();
offsetSets.remove(response.getOffset());
// offsetSets.remove(response.getOffset());
} catch (Exception e) {

LOG.severe("Thread execution failed: " + e.getMessage());
}
}
Expand All @@ -836,7 +879,7 @@ public void run() {
for (int i = 0; i < thread_nums; i++) {
thread_arr[i].join();
}
assertTrue(offsetSets.size() == 0);
// assertTrue(offsetSets.size() == 0);
for (int i = 0; i < thread_nums; i++) {
assertEquals(
1,
Expand Down Expand Up @@ -878,14 +921,17 @@ public void testMultiThreadAppendWithSchemaUpdate() throws Exception {
Thread[] thread_arr = new Thread[numberThreads];
for (int i = 0; i < numberThreads; i++) {
if (i == 2) {
// Temp for Breaking Change.
testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setOffset((long) i)
// .setOffset((long) i)
.setUpdatedSchema(UPDATED_TABLE_SCHEMA)
.build());
} else {
testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder().setOffset((long) i).build());
// Temp for Breaking Change.
// testBigQueryWrite.addResponse(
// AppendRowsResponse.newBuilder().setOffset((long) i).build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());
}

offsetSets.add((long) i);
Expand All @@ -897,7 +943,7 @@ public void run() {
ApiFuture<AppendRowsResponse> appendFuture =
writer.append(jsonArr, -1, /* allowUnknownFields */ false);
AppendRowsResponse response = appendFuture.get();
offsetSets.remove(response.getOffset());
// offsetSets.remove(response.getOffset());
} catch (Exception e) {
LOG.severe("Thread execution failed: " + e.getMessage());
}
Expand All @@ -910,7 +956,7 @@ public void run() {
for (int i = 0; i < numberThreads; i++) {
thread_arr[i].join();
}
assertTrue(offsetSets.size() == 0);
// assertTrue(offsetSets.size() == 0);
for (int i = 0; i < numberThreads; i++) {
assertEquals(
1,
Expand Down Expand Up @@ -945,7 +991,10 @@ public void run() {
jsonArr2.put(foo);

for (int i = numberThreads; i < numberThreads + 5; i++) {
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset((long) i).build());
// Temp for Breaking Change.
// testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset((long)
// i).build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());
offsetSets.add((long) i);
Thread t =
new Thread(
Expand All @@ -968,7 +1017,7 @@ public void run() {
for (int i = 0; i < numberThreads; i++) {
thread_arr[i].join();
}
assertTrue(offsetSets.size() == 0);
// assertTrue(offsetSets.size() == 0);
for (int i = 0; i < numberThreads; i++) {
assertEquals(
1,
Expand Down
Loading

0 comments on commit 38c95c2

Please sign in to comment.