Skip to content

Commit

Permalink
fix: improve ConnectionWorker fine logging (#1972)
Browse files Browse the repository at this point in the history
* fix: improving ConnectionWorker fine logging

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
yirutang and gcf-owl-bot[bot] committed Feb 3, 2023
1 parent b05fff5 commit 812bcf1
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 23 deletions.
6 changes: 3 additions & 3 deletions README.md
Expand Up @@ -49,20 +49,20 @@ If you are using Maven without BOM, add this to your dependencies:
If you are using Gradle 5.x or later, add this to your dependencies:

```Groovy
implementation platform('com.google.cloud:libraries-bom:26.5.0')
implementation platform('com.google.cloud:libraries-bom:26.6.0')
implementation 'com.google.cloud:google-cloud-bigquerystorage'
```
If you are using Gradle without BOM, add this to your dependencies:

```Groovy
implementation 'com.google.cloud:google-cloud-bigquerystorage:2.28.4'
implementation 'com.google.cloud:google-cloud-bigquerystorage:2.29.0'
```

If you are using SBT, add this to your dependencies:

```Scala
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.28.4"
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.29.0"
```

## Authentication
Expand Down
Expand Up @@ -242,7 +242,7 @@ public void run() {
}

private void resetConnection() {
log.info("Reconnecting for stream:" + streamName);
log.info("Reconnecting for stream:" + streamName + " id: " + writerId);
this.streamConnection =
new StreamConnection(
this.client,
Expand All @@ -258,6 +258,7 @@ public void run(Throwable finalStatus) {
doneCallback(finalStatus);
}
});
log.info("Reconnect done for stream:" + streamName + " id: " + writerId);
}

/** Schedules the writing of rows at given offset. */
Expand Down Expand Up @@ -392,13 +393,18 @@ public void close() {
} finally {
this.lock.unlock();
}
log.fine("Waiting for append thread to finish. Stream: " + streamName);
log.fine("Waiting for append thread to finish. Stream: " + streamName + " id: " + writerId);
try {
appendThread.join();
} catch (InterruptedException e) {
// Unexpected. Just swallow the exception with logging.
log.warning(
"Append handler join is interrupted. Stream: " + streamName + " Error: " + e.toString());
"Append handler join is interrupted. Stream: "
+ streamName
+ " id: "
+ writerId
+ " Error: "
+ e.toString());
}
this.client.close();
try {
Expand All @@ -408,14 +414,20 @@ public void close() {
}

try {
log.fine("Begin shutting down user callback thread pool for stream " + streamName);
log.fine(
"Begin shutting down user callback thread pool for stream "
+ streamName
+ " id: "
+ writerId);
threadPool.shutdown();
threadPool.awaitTermination(3, TimeUnit.MINUTES);
} catch (InterruptedException e) {
// Unexpected. Just swallow the exception with logging.
log.warning(
"Close on thread pool for "
+ streamName
+ " id: "
+ writerId
+ " is interrupted with exception: "
+ e.toString());
throw new IllegalStateException(
Expand Down Expand Up @@ -464,6 +476,8 @@ private void appendLoop() {
log.warning(
"Interrupted while waiting for message. Stream: "
+ streamName
+ " id: "
+ writerId
+ " Error: "
+ e.toString());
} finally {
Expand Down Expand Up @@ -539,17 +553,11 @@ private void appendLoop() {
// TODO: Handle NOT_ENOUGH_QUOTA.
// In the close case, the request is in the inflight queue, and will either be returned
// to the user with an error, or will be resent.
log.fine(
"Sending "
+ originalRequestBuilder.getProtoRows().getRows().getSerializedRowsCount()
+ " rows to stream '"
+ originalRequestBuilder.getWriteStream()
+ "'");
this.streamConnection.send(originalRequestBuilder.build());
}
}

log.fine("Cleanup starts. Stream: " + streamName);
log.fine("Cleanup starts. Stream: " + streamName + " id: " + writerId);
// At this point, the waiting queue is drained, so no more requests.
// We can close the stream connection and handle the remaining inflight requests.
if (streamConnection != null) {
Expand All @@ -559,9 +567,12 @@ private void appendLoop() {

// At this point, there cannot be more callback. It is safe to clean up all inflight requests.
log.fine(
"Stream connection is fully closed. Cleaning up inflight requests. Stream: " + streamName);
"Stream connection is fully closed. Cleaning up inflight requests. Stream: "
+ streamName
+ " id: "
+ writerId);
cleanupInflightRequests();
log.fine("Append thread is done. Stream: " + streamName);
log.fine("Append thread is done. Stream: " + streamName + " id: " + writerId);
}

/*
Expand All @@ -581,7 +592,11 @@ private boolean waitingQueueDrained() {
}

private void waitForDoneCallback(long duration, TimeUnit timeUnit) {
log.fine("Waiting for done callback from stream connection. Stream: " + streamName);
log.fine(
"Waiting for done callback from stream connection. Stream: "
+ streamName
+ " id: "
+ writerId);
long deadline = System.nanoTime() + timeUnit.toNanos(duration);
while (System.nanoTime() <= deadline) {
this.lock.lock();
Expand Down Expand Up @@ -630,23 +645,29 @@ private void cleanupInflightRequests() {
} finally {
this.lock.unlock();
}
log.fine("Cleaning " + localQueue.size() + " inflight requests with error: " + finalStatus);
log.fine(
"Cleaning "
+ localQueue.size()
+ " inflight requests with error: "
+ finalStatus
+ " for Stream "
+ streamName
+ " id: "
+ writerId);
while (!localQueue.isEmpty()) {
localQueue.pollFirst().appendResult.setException(finalStatus);
}
}

private void requestCallback(AppendRowsResponse response) {
if (!response.hasUpdatedSchema()) {
log.fine(String.format("Got response on stream %s", response.toString()));
} else {
if (response.hasUpdatedSchema()) {
AppendRowsResponse responseWithUpdatedSchemaRemoved =
response.toBuilder().clearUpdatedSchema().build();

log.fine(
String.format(
"Got response with schema updated (omitting updated schema in response here): %s",
responseWithUpdatedSchemaRemoved.toString()));
"Got response with schema updated (omitting updated schema in response here): %s writer id %s",
responseWithUpdatedSchemaRemoved.toString(), writerId));
}

AppendRequestAndResponse requestWrapper;
Expand Down Expand Up @@ -737,6 +758,8 @@ private void doneCallback(Throwable finalStatus) {
log.fine(
"Received done callback. Stream: "
+ streamName
+ " worker id: "
+ writerId
+ " Final status: "
+ finalStatus.toString());
this.lock.lock();
Expand Down

0 comments on commit 812bcf1

Please sign in to comment.