@@ -18,7 +18,6 @@
import static org .junit .Assert .assertEquals ;
import static org .junit .Assert .assertTrue ;
import com .google .api .core .*;
import com .google .api .core .ApiFuture ;
import com .google .api .gax .core .ExecutorProvider ;
import com .google .api .gax .core .InstantiatingExecutorProvider ;
@@ -32,14 +31,18 @@
import com .google .cloud .bigquery .storage .test .Test .UpdatedFooType2 ;
import com .google .cloud .bigquery .storage .v1alpha2 .Storage .AppendRowsResponse ;
import com .google .protobuf .ByteString ;
import com .google .protobuf .Descriptors .DescriptorValidationException ;
import com .google .protobuf .Timestamp ;
import java .io .IOException ;
import java .util .Arrays ;
import java .util .HashSet ;
import java .util .UUID ;
import java .util .concurrent .ExecutionException ;
import java .util .logging .Logger ;
import org .json .JSONArray ;
import org .json .JSONObject ;
import org .junit .After ;
import org .junit .Assert ;
import org .junit .Before ;
import org .junit .Test ;
import org .junit .runner .RunWith ;
@@ -168,7 +171,6 @@ public void setUp() throws Exception {
@ After
public void tearDown () throws Exception {
LOG .info ("tearDown called" );
serviceHelper .stop ();
}
@@ -181,19 +183,28 @@ private JsonStreamWriter.Builder getTestJsonStreamWriterBuilder(
}
@ Test
public void testTwoParamNewBuilder () throws Exception {
public void testTwoParamNewBuilder_nullSchema () {
try {
getTestJsonStreamWriterBuilder (null , TABLE_SCHEMA );
Assert .fail ("expected NullPointerException" );
} catch (NullPointerException e ) {
assertEquals (e .getMessage (), "StreamName is null." );
}
}
@ Test
public void testTwoParamNewBuilder_nullStream () {
try {
getTestJsonStreamWriterBuilder (TEST_STREAM , null );
Assert .fail ("expected NullPointerException" );
} catch (NullPointerException e ) {
assertEquals (e .getMessage (), "TableSchema is null." );
}
}
@ Test
public void testTwoParamNewBuilder ()
throws DescriptorValidationException , IOException , InterruptedException {
JsonStreamWriter writer = getTestJsonStreamWriterBuilder (TEST_STREAM , TABLE_SCHEMA ).build ();
assertEquals (TEST_STREAM , writer .getStreamName ());
}
@@ -522,7 +533,9 @@ public void testAppendMultipleSchemaUpdate() throws Exception {
}
@ Test
public void testAppendAlreadyExistsException () throws Exception {
// This might be a bug but it is the current behavior. Investigate.
public void testAppendAlreadyExists_doesNotThrowxception ()
throws DescriptorValidationException , IOException , InterruptedException , ExecutionException {
try (JsonStreamWriter writer =
getTestJsonStreamWriterBuilder (TEST_STREAM , TABLE_SCHEMA ).build ()) {
testBigQueryWrite .addResponse (
@@ -535,11 +548,7 @@ public void testAppendAlreadyExistsException() throws Exception {
jsonArr .put (foo );
ApiFuture <AppendRowsResponse > appendFuture =
writer .append (jsonArr , -1 , /* allowUnknownFields */ false );
try {
appendFuture .get ();
} catch (Throwable t ) {
assertEquals (t .getCause ().getMessage (), "ALREADY_EXISTS: " );
}
appendFuture .get ();
}
}
@@ -559,8 +568,9 @@ public void testAppendOutOfRangeException() throws Exception {
writer .append (jsonArr , -1 , /* allowUnknownFields */ false );
try {
appendFuture .get ();
} catch (Throwable t ) {
assertEquals (t .getCause ().getMessage (), "OUT_OF_RANGE: " );
Assert .fail ("expected ExecutionException" );
} catch (ExecutionException ex ) {
assertEquals (ex .getCause ().getMessage (), "OUT_OF_RANGE: " );
}
}
}
@@ -584,8 +594,9 @@ public void testAppendOutOfRangeAndUpdateSchema() throws Exception {
writer .append (jsonArr , -1 , /* allowUnknownFields */ false );
try {
appendFuture .get ();
} catch (Throwable t ) {
assertEquals (t .getCause ().getMessage (), "OUT_OF_RANGE: " );
Assert .fail ("expected ExecutionException" );
} catch (ExecutionException ex ) {
assertEquals (ex .getCause ().getMessage (), "OUT_OF_RANGE: " );
int millis = 0 ;
while (millis <= 10000 ) {
if (writer .getDescriptor ().getFields ().size () == 2 ) {
@@ -781,15 +792,13 @@ public void run() {
ApiFuture <AppendRowsResponse > appendFuture =
writer .append (jsonArr , -1 , /* allowUnknownFields */ false );
AppendRowsResponse response = appendFuture .get ();
LOG .info ("Processing complete, offset is " + response .getOffset ());
offset_sets .remove (response .getOffset ());
} catch (Exception e ) {
LOG .severe ("Thread execution failed: " + e .getMessage ());
}
}
});
thread_arr [i ] = t ;
LOG .info ("Starting thread " + i + "." );
t .start ();
}
@@ -833,10 +842,10 @@ public void testMultiThreadAppendWithSchemaUpdate() throws Exception {
final JSONArray jsonArr = new JSONArray ();
jsonArr .put (foo );
final HashSet <Long > offset_sets = new HashSet <Long >();
int thread_nums = 5 ;
Thread [] thread_arr = new Thread [thread_nums ];
for (int i = 0 ; i < thread_nums ; i ++) {
final HashSet <Long > offsetSets = new HashSet <Long >();
int numberThreads = 5 ;
Thread [] thread_arr = new Thread [numberThreads ];
for (int i = 0 ; i < numberThreads ; i ++) {
if (i == 2 ) {
testBigQueryWrite .addResponse (
Storage .AppendRowsResponse .newBuilder ()
@@ -848,7 +857,7 @@ public void testMultiThreadAppendWithSchemaUpdate() throws Exception {
Storage .AppendRowsResponse .newBuilder ().setOffset ((long ) i ).build ());
}
offset_sets .add ((long ) i );
offsetSets .add ((long ) i );
Thread t =
new Thread (
new Runnable () {
@@ -857,23 +866,21 @@ public void run() {
ApiFuture <AppendRowsResponse > appendFuture =
writer .append (jsonArr , -1 , /* allowUnknownFields */ false );
AppendRowsResponse response = appendFuture .get ();
LOG .info ("Processing complete, offset is " + response .getOffset ());
offset_sets .remove (response .getOffset ());
offsetSets .remove (response .getOffset ());
} catch (Exception e ) {
LOG .severe ("Thread execution failed: " + e .getMessage ());
}
}
});
thread_arr [i ] = t ;
LOG .info ("Starting thread " + i + "." );
t .start ();
}
for (int i = 0 ; i < thread_nums ; i ++) {
for (int i = 0 ; i < numberThreads ; i ++) {
thread_arr [i ].join ();
}
assertTrue (offset_sets .size () == 0 );
for (int i = 0 ; i < thread_nums ; i ++) {
assertTrue (offsetSets .size () == 0 );
for (int i = 0 ; i < numberThreads ; i ++) {
assertEquals (
1 ,
testBigQueryWrite
@@ -900,16 +907,16 @@ public void run() {
Thread .sleep (100 );
millis += 100 ;
}
assertTrue ( writer .getDescriptor ().getFields ().size () == 2 );
assertEquals ( 2 , writer .getDescriptor ().getFields ().size ());
foo .put ("bar" , "allen2" );
final JSONArray jsonArr2 = new JSONArray ();
jsonArr2 .put (foo );
for (int i = thread_nums ; i < thread_nums + 5 ; i ++) {
for (int i = numberThreads ; i < numberThreads + 5 ; i ++) {
testBigQueryWrite .addResponse (
Storage .AppendRowsResponse .newBuilder ().setOffset ((long ) i ).build ());
offset_sets .add ((long ) i );
offsetSets .add ((long ) i );
Thread t =
new Thread (
new Runnable () {
@@ -918,23 +925,21 @@ public void run() {
ApiFuture <AppendRowsResponse > appendFuture =
writer .append (jsonArr2 , -1 , /* allowUnknownFields */ false );
AppendRowsResponse response = appendFuture .get ();
LOG .info ("Processing complete, offset is " + response .getOffset ());
offset_sets .remove (response .getOffset ());
offsetSets .remove (response .getOffset ());
} catch (Exception e ) {
LOG .severe ("Thread execution failed: " + e .getMessage ());
}
}
});
thread_arr [i - 5 ] = t ;
LOG .info ("Starting thread " + i + " with updated json data." );
t .start ();
}
for (int i = 0 ; i < thread_nums ; i ++) {
for (int i = 0 ; i < numberThreads ; i ++) {
thread_arr [i ].join ();
}
assertTrue (offset_sets .size () == 0 );
for (int i = 0 ; i < thread_nums ; i ++) {
assertTrue (offsetSets .size () == 0 );
for (int i = 0 ; i < numberThreads ; i ++) {
assertEquals (
1 ,
testBigQueryWrite