Skip to content

Commit ac48f12

Browse files
feat: Close clients gracefully (#56)
* update * update * update * update * update * update
1 parent b91ecb2 commit ac48f12

4 files changed

Lines changed: 16 additions & 18 deletions

File tree

src/main/java/com/google/cloud/pubsublite/spark/PslContinuousReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,8 @@ public void commit(Offset end) {
9898

9999
@Override
100100
public void stop() {
101-
cursorClient.shutdown();
102101
committer.close();
102+
cursorClient.close();
103103
}
104104

105105
@Override

src/main/java/com/google/cloud/pubsublite/spark/PslDataSource.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,14 @@
1616

1717
package com.google.cloud.pubsublite.spark;
1818

19+
import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
20+
1921
import com.github.benmanes.caffeine.cache.Ticker;
2022
import com.google.auto.service.AutoService;
2123
import com.google.cloud.pubsublite.AdminClient;
2224
import com.google.cloud.pubsublite.PartitionLookupUtils;
2325
import com.google.cloud.pubsublite.SubscriptionPath;
2426
import com.google.cloud.pubsublite.TopicPath;
25-
import com.google.cloud.pubsublite.internal.CursorClient;
2627
import java.util.Objects;
2728
import java.util.Optional;
2829
import org.apache.spark.sql.sources.DataSourceRegister;
@@ -53,12 +54,13 @@ public ContinuousReader createContinuousReader(
5354

5455
PslDataSourceOptions pslDataSourceOptions =
5556
PslDataSourceOptions.fromSparkDataSourceOptions(options);
56-
CursorClient cursorClient = pslDataSourceOptions.newCursorClient();
57-
AdminClient adminClient = pslDataSourceOptions.newAdminClient();
5857
SubscriptionPath subscriptionPath = pslDataSourceOptions.subscriptionPath();
59-
long topicPartitionCount = PartitionLookupUtils.numPartitions(subscriptionPath, adminClient);
58+
long topicPartitionCount;
59+
try (AdminClient adminClient = pslDataSourceOptions.newAdminClient()) {
60+
topicPartitionCount = PartitionLookupUtils.numPartitions(subscriptionPath, adminClient);
61+
}
6062
return new PslContinuousReader(
61-
cursorClient,
63+
pslDataSourceOptions.newCursorClient(),
6264
pslDataSourceOptions.newMultiPartitionCommitter(topicPartitionCount),
6365
pslDataSourceOptions.getSubscriberFactory(),
6466
subscriptionPath,
@@ -76,19 +78,17 @@ public MicroBatchReader createMicroBatchReader(
7678

7779
PslDataSourceOptions pslDataSourceOptions =
7880
PslDataSourceOptions.fromSparkDataSourceOptions(options);
79-
CursorClient cursorClient = pslDataSourceOptions.newCursorClient();
80-
AdminClient adminClient = pslDataSourceOptions.newAdminClient();
8181
SubscriptionPath subscriptionPath = pslDataSourceOptions.subscriptionPath();
8282
TopicPath topicPath;
83-
try {
83+
long topicPartitionCount;
84+
try (AdminClient adminClient = pslDataSourceOptions.newAdminClient()) {
8485
topicPath = TopicPath.parse(adminClient.getSubscription(subscriptionPath).get().getTopic());
86+
topicPartitionCount = PartitionLookupUtils.numPartitions(topicPath, adminClient);
8587
} catch (Throwable t) {
86-
throw new IllegalStateException(
87-
"Unable to get topic for subscription " + subscriptionPath, t);
88+
throw toCanonical(t).underlying;
8889
}
89-
long topicPartitionCount = PartitionLookupUtils.numPartitions(topicPath, adminClient);
9090
return new PslMicroBatchReader(
91-
cursorClient,
91+
pslDataSourceOptions.newCursorClient(),
9292
pslDataSourceOptions.newMultiPartitionCommitter(topicPartitionCount),
9393
pslDataSourceOptions.getSubscriberFactory(),
9494
new LimitingHeadOffsetReader(

src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchInputPartitionReader.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,6 @@ public InternalRow get() {
9797

9898
@Override
9999
public void close() {
100-
try {
101-
subscriber.close();
102-
} catch (Exception e) {
103-
log.atWarning().log("Subscriber failed to close.");
104-
}
100+
subscriber.close();
105101
}
106102
}

src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchReader.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ public void commit(Offset end) {
115115
@Override
116116
public void stop() {
117117
committer.close();
118+
cursorClient.close();
119+
headOffsetReader.close();
118120
}
119121

120122
@Override

0 commit comments

Comments
 (0)