|
19 | 19 | import com.google.api.core.ApiFuture; |
20 | 20 | import com.google.api.core.ApiFutureCallback; |
21 | 21 | import com.google.api.core.ApiFutures; |
| 22 | +import com.google.cloud.pubsublite.Offset; |
22 | 23 | import com.google.cloud.pubsublite.Partition; |
23 | 24 | import com.google.cloud.pubsublite.internal.wire.Committer; |
24 | 25 | import com.google.cloud.pubsublite.spark.PslSourceOffset; |
@@ -115,30 +116,29 @@ private synchronized void cleanUpCommitterMap() { |
115 | 116 | @Override |
116 | 117 | public synchronized void commit(PslSourceOffset offset) { |
117 | 118 | updateCommitterMap(offset); |
118 | | - offset |
119 | | - .partitionOffsetMap() |
120 | | - .forEach( |
121 | | - (p, o) -> { |
122 | | - // Note we don't need to worry about commit offset disorder here since Committer |
123 | | - // guarantees the ordering. Once commitOffset() returns, it's either already |
124 | | - // sent to stream, or waiting for next connection to open to be sent in order. |
125 | | - ApiFuture<Void> future = committerMap.get(p).commitOffset(o); |
126 | | - ApiFutures.addCallback( |
127 | | - future, |
128 | | - new ApiFutureCallback<Void>() { |
129 | | - @Override |
130 | | - public void onFailure(Throwable t) { |
131 | | - if (!future.isCancelled()) { |
132 | | - log.atWarning().log("Failed to commit %s,%s.", p.value(), o.value(), t); |
133 | | - } |
134 | | - } |
| 119 | + for (Map.Entry<Partition, Offset> entry : offset.partitionOffsetMap().entrySet()) { |
| 120 | + // Note we don't need to worry about commit offset disorder here since Committer |
| 121 | + // guarantees the ordering. Once commitOffset() returns, it's either already |
| 122 | + // sent to stream, or waiting for next connection to open to be sent in order. |
| 123 | + ApiFuture<Void> future = committerMap.get(entry.getKey()).commitOffset(entry.getValue()); |
| 124 | + ApiFutures.addCallback( |
| 125 | + future, |
| 126 | + new ApiFutureCallback<Void>() { |
| 127 | + @Override |
| 128 | + public void onFailure(Throwable t) { |
| 129 | + if (!future.isCancelled()) { |
| 130 | + log.atWarning().withCause(t).log( |
| 131 | + "Failed to commit %s,%s.", entry.getKey().value(), entry.getValue().value()); |
| 132 | + } |
| 133 | + } |
135 | 134 |
|
136 | | - @Override |
137 | | - public void onSuccess(Void result) { |
138 | | - log.atInfo().log("Committed %s,%s.", p.value(), o.value()); |
139 | | - } |
140 | | - }, |
141 | | - MoreExecutors.directExecutor()); |
142 | | - }); |
| 135 | + @Override |
| 136 | + public void onSuccess(Void result) { |
| 137 | + log.atInfo().log( |
| 138 | + "Committed %s,%s.", entry.getKey().value(), entry.getValue().value()); |
| 139 | + } |
| 140 | + }, |
| 141 | + MoreExecutors.directExecutor()); |
| 142 | + } |
143 | 143 | } |
144 | 144 | } |
0 commit comments