3030import java .util .HashMap ;
3131import java .util .Map ;
3232import java .util .concurrent .CompletableFuture ;
33+ import java .util .concurrent .ConcurrentHashMap ;
34+ import java .util .concurrent .ExecutionException ;
3335import java .util .concurrent .Future ;
3436import lombok .extern .slf4j .Slf4j ;
3537import org .apache .kafka .connect .runtime .WorkerConfig ;
5153@ Slf4j
5254public class PulsarOffsetBackingStore implements OffsetBackingStore {
5355
54- private Map <ByteBuffer , ByteBuffer > data ;
56+ private final Map <ByteBuffer , ByteBuffer > data = new ConcurrentHashMap <>() ;
5557 private PulsarClient client ;
5658 private String serviceUrl ;
5759 private String topic ;
@@ -69,7 +71,6 @@ public PulsarOffsetBackingStore(PulsarClient client) {
6971 public void configure (WorkerConfig workerConfig ) {
7072 this .topic = workerConfig .getString (PulsarKafkaWorkerConfig .OFFSET_STORAGE_TOPIC_CONFIG );
7173 checkArgument (!isBlank (topic ), "Offset storage topic must be specified" );
72- this .data = new HashMap <>();
7374
7475 log .info ("Configure offset backing store on pulsar topic {} at cluster {}" , topic );
7576 }
@@ -130,10 +131,13 @@ private void readNext(CompletableFuture<Void> endFuture) {
130131 }
131132
132133 void processMessage (Message <byte []> message ) {
133- synchronized ( data ) {
134+ if ( message . getKey () != null ) {
134135 data .put (
135136 ByteBuffer .wrap (message .getKey ().getBytes (UTF_8 )),
136137 ByteBuffer .wrap (message .getValue ()));
138+ } else {
139+ log .debug ("Got message without key from the offset storage topic, skip it. message value: {}" ,
140+ message .getValue ());
137141 }
138142 }
139143
@@ -153,10 +157,13 @@ public void start() {
153157
154158 CompletableFuture <Void > endFuture = new CompletableFuture <>();
155159 readToEnd (endFuture );
156- endFuture .join ();
160+ endFuture .get ();
157161 } catch (PulsarClientException e ) {
158162 log .error ("Failed to setup pulsar producer/reader to cluster at {}" , serviceUrl , e );
159163 throw new RuntimeException ("Failed to setup pulsar producer/reader to cluster at " + serviceUrl , e );
164+ } catch (ExecutionException | InterruptedException e ) {
165+ log .error ("Failed to start PulsarOffsetBackingStore" , e );
166+ throw new RuntimeException ("Failed to start PulsarOffsetBackingStore" , e );
160167 }
161168 }
162169
@@ -184,6 +191,8 @@ public void stop() {
184191 }
185192 reader = null ;
186193 }
194+
195+ data .clear ();
187196 // do not close the client, it is provided by the sink context
188197 }
189198
@@ -194,10 +203,7 @@ public Future<Map<ByteBuffer, ByteBuffer>> get(Collection<ByteBuffer> keys) {
194203 return endFuture .thenApply (ignored -> {
195204 Map <ByteBuffer , ByteBuffer > values = new HashMap <>();
196205 for (ByteBuffer key : keys ) {
197- ByteBuffer value ;
198- synchronized (data ) {
199- value = data .get (key );
200- }
206+ ByteBuffer value = data .get (key );
201207 if (null != value ) {
202208 values .put (key , value );
203209 }
0 commit comments