diff --git a/README.md b/README.md index 712bb3034e..908faab2c6 100644 --- a/README.md +++ b/README.md @@ -49,20 +49,20 @@ If you are using Maven without BOM, add this to your dependencies: If you are using Gradle 5.x or later, add this to your dependencies: ```Groovy -implementation platform('com.google.cloud:libraries-bom:26.5.0') +implementation platform('com.google.cloud:libraries-bom:26.6.0') implementation 'com.google.cloud:google-cloud-bigquerystorage' ``` If you are using Gradle without BOM, add this to your dependencies: ```Groovy -implementation 'com.google.cloud:google-cloud-bigquerystorage:2.28.4' +implementation 'com.google.cloud:google-cloud-bigquerystorage:2.29.0' ``` If you are using SBT, add this to your dependencies: ```Scala -libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.28.4" +libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.29.0" ``` ## Authentication diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 8ca9304fe1..3e6b52904d 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -242,7 +242,7 @@ public void run() { } private void resetConnection() { - log.info("Reconnecting for stream:" + streamName); + log.info("Reconnecting for stream:" + streamName + " id: " + writerId); this.streamConnection = new StreamConnection( this.client, @@ -258,6 +258,7 @@ public void run(Throwable finalStatus) { doneCallback(finalStatus); } }); + log.info("Reconnect done for stream:" + streamName + " id: " + writerId); } /** Schedules the writing of rows at given offset. */ @@ -392,13 +393,18 @@ public void close() { } finally { this.lock.unlock(); } - log.fine("Waiting for append thread to finish. Stream: " + streamName); + log.fine("Waiting for append thread to finish. Stream: " + streamName + " id: " + writerId); try { appendThread.join(); } catch (InterruptedException e) { // Unexpected. Just swallow the exception with logging. log.warning( - "Append handler join is interrupted. Stream: " + streamName + " Error: " + e.toString()); + "Append handler join is interrupted. Stream: " + + streamName + + " id: " + + writerId + + " Error: " + + e.toString()); } this.client.close(); try { @@ -408,7 +414,11 @@ public void close() { } try { - log.fine("Begin shutting down user callback thread pool for stream " + streamName); + log.fine( + "Begin shutting down user callback thread pool for stream " + + streamName + + " id: " + + writerId); threadPool.shutdown(); threadPool.awaitTermination(3, TimeUnit.MINUTES); } catch (InterruptedException e) { @@ -416,6 +426,8 @@ public void close() { log.warning( "Close on thread pool for " + streamName + + " id: " + + writerId + " is interrupted with exception: " + e.toString()); throw new IllegalStateException( @@ -464,6 +476,8 @@ private void appendLoop() { log.warning( "Interrupted while waiting for message. Stream: " + streamName + + " id: " + + writerId + " Error: " + e.toString()); } finally { @@ -539,17 +553,11 @@ private void appendLoop() { // TODO: Handle NOT_ENOUGH_QUOTA. // In the close case, the request is in the inflight queue, and will either be returned // to the user with an error, or will be resent. - log.fine( - "Sending " - + originalRequestBuilder.getProtoRows().getRows().getSerializedRowsCount() - + " rows to stream '" - + originalRequestBuilder.getWriteStream() - + "'"); this.streamConnection.send(originalRequestBuilder.build()); } } - log.fine("Cleanup starts. Stream: " + streamName); + log.fine("Cleanup starts. Stream: " + streamName + " id: " + writerId); // At this point, the waiting queue is drained, so no more requests. // We can close the stream connection and handle the remaining inflight requests. if (streamConnection != null) { @@ -559,9 +567,12 @@ private void appendLoop() { // At this point, there cannot be more callback. It is safe to clean up all inflight requests. log.fine( - "Stream connection is fully closed. Cleaning up inflight requests. Stream: " + streamName); + "Stream connection is fully closed. Cleaning up inflight requests. Stream: " + + streamName + + " id: " + + writerId); cleanupInflightRequests(); - log.fine("Append thread is done. Stream: " + streamName); + log.fine("Append thread is done. Stream: " + streamName + " id: " + writerId); } /* @@ -581,7 +592,11 @@ private boolean waitingQueueDrained() { } private void waitForDoneCallback(long duration, TimeUnit timeUnit) { - log.fine("Waiting for done callback from stream connection. Stream: " + streamName); + log.fine( + "Waiting for done callback from stream connection. Stream: " + + streamName + + " id: " + + writerId); long deadline = System.nanoTime() + timeUnit.toNanos(duration); while (System.nanoTime() <= deadline) { this.lock.lock(); @@ -630,23 +645,29 @@ private void cleanupInflightRequests() { } finally { this.lock.unlock(); } - log.fine("Cleaning " + localQueue.size() + " inflight requests with error: " + finalStatus); + log.fine( + "Cleaning " + + localQueue.size() + + " inflight requests with error: " + + finalStatus + + " for Stream " + + streamName + + " id: " + + writerId); while (!localQueue.isEmpty()) { localQueue.pollFirst().appendResult.setException(finalStatus); } } private void requestCallback(AppendRowsResponse response) { - if (!response.hasUpdatedSchema()) { - log.fine(String.format("Got response on stream %s", response.toString())); - } else { + if (response.hasUpdatedSchema()) { AppendRowsResponse responseWithUpdatedSchemaRemoved = response.toBuilder().clearUpdatedSchema().build(); log.fine( String.format( - "Got response with schema updated (omitting updated schema in response here): %s", - responseWithUpdatedSchemaRemoved.toString())); + "Got response with schema updated (omitting updated schema in response here): %s writer id %s", + responseWithUpdatedSchemaRemoved.toString(), writerId)); } AppendRequestAndResponse requestWrapper; @@ -737,6 +758,8 @@ private void doneCallback(Throwable finalStatus) { log.fine( "Received done callback. Stream: " + streamName + + " worker id: " + + writerId + " Final status: " + finalStatus.toString()); this.lock.lock();