Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[lock] Multiple threads from Spring boot application able to acquire lock on same document id ( String ) #10754

Closed
dmots opened this issue Jun 14, 2017 · 11 comments

Comments

@dmots
Copy link

@dmots dmots commented Jun 14, 2017

Hi
We were testing Hazelcast locking capabilities in our dev environment. We created a Spring Boot Application to update a document in Couchbase concurrently. We tested concurrent requests/load on the REST endpoint using JMeter. Underlying App Server used was Tomcat 8. When we run the load test with concurrent users first loading and then updating same document , multiple threads are able to acquire lock on same document id within a span of milliseconds. We tried acquiring the lock by joining Hazelcast cluster as well as Client. Finally when we put the code to acquire lock in synchronised block , multiple threads are not able to acquire the lock on same document Id. But we are not sure whether this is the correct approach. Synchronised block is applicable to only the JVM of that particular Spring App Instance. If we have multiple app instances running , will we come across the same problem agian ?

@jerrinot
Copy link
Contributor

@jerrinot jerrinot commented Jun 14, 2017

@dmots:

thanks for reporting a possible issue. What Hazelcast version are you using? Can you share the code you are using the acquire the lock?

@jerrinot jerrinot added this to the 3.9 milestone Jun 14, 2017
@dmots
Copy link
Author

@dmots dmots commented Jun 14, 2017

Thanks jerrinot for looking into it
We are using version 3.8.2 of client as well as server libraries
Including Utils Class code at the end of this message

We are calling following methods from Spring REST operation method to update the couchbase document

lockUtils.lockEntity(documentId);
lockUtils.unlockEntity(_orderId);			

We are using forceUnlock as we are using rxJava loading document can happen in one thread while updating document can happen in another thread. we are releasing the lock in Observer within the code which handles onNext event from Observable

When we print how logs are being acquired on console we can see following log

Timestamp
1497342041174 Acquired lock on document '01ABC'
1497342041190 Acquired lock on document '01ABC'
1497342041194 Acquired lock on document '01ABC'

The moment we hit run button on Jmeter with 5 concurrent users. 5 threads acquire the lock on same order within milliseconds

public class LockUtils {

	private static ClientConfig clientConfig = new ClientConfig();
	private static LockUtils lockUtils = null;
	private static HazelcastInstance client = null;
	private static IMap<String, String> lockmap = null;

	public synchronized static LockUtils getInstance(String host, int port) {

		if (lockUtils == null) {
			lockUtils = new LockUtils(host, port);
			lockmap = client.getMap("locks");
			return lockUtils;
		}

		return lockUtils;

	}

	private LockUtils(String host, int port) {
		clientConfig.addAddress(host + ":" + port);
		client = HazelcastClient.newHazelcastClient(clientConfig);
	}

	public synchronized ILock acquireLock(String lockKey) {
		ILock ilock = client.getLock(lockKey);
		return ilock;
	}

	public synchronized void lockEntity(String lockKey) {
		lockmap.lock(lockKey);
	}

	public void unlockEntity(String lockKey) {
		lockmap.forceUnlock(lockKey);
	}

	private static void initialize(String host, int port) {
		clientConfig.addAddress(host + ":" + port);
		client = HazelcastClient.newHazelcastClient(clientConfig);
	}
}
@dmots
Copy link
Author

@dmots dmots commented Jun 14, 2017

*Correction

lockUtils.lockEntity(documentId);
Load Document
Update Document
lockUtils.unlockEntity(documentId);
@dmots
Copy link
Author

@dmots dmots commented Jun 16, 2017

@ jerrinot

Hi jerriont , I'm just suspecting it could be something to do with the way JMeter sends requests for multiple users exactly at same time possibly within nano seconds. When I write a standalone program and spawn multiple threads manually to update the order , I'm not able to reproduce the issue. Issue occurs only when we test using JMeter

@jerrinot
Copy link
Contributor

@jerrinot jerrinot commented Jun 16, 2017

@dmots: are you sure you are interpreting the result right?
System.currentTimeMillis() is not guaranteed to be monotonic - you can observe time going back, especially on multicore systems. It would be great if you could post your test here.

@dmots
Copy link
Author

@dmots dmots commented Jun 16, 2017

@jerrinot I could see this behaviour consistently for couple of days. I'm not able to reproduce it consistently using JMeter either now. Looks like it was an intermittent issue. Below is the facade method which gets called from JMeter. Some of the variables are defined at Spring controller level and initialized in the constructor. But you may get an idea of what this method is doing. As of now I'm not seeing the erroneous behavior reported earlier. We are planning to run the same test with multiple App Servers hitting the DB ( Couchbase ) in a Cloud Environment. Making the method synchronized works. Just wanted to know whether it is the correct way to acquire lock. Is JVM based synchronization really required

@RequestMapping("/update_random_order_async")
  public void handleUpdateOrderAsync(HttpServletRequest request, HttpServletResponse response) {

    String orderId = null;

    if (orderIds != null && !orderIds.isEmpty()) {
      orderId = orderIds.get(RandomDataUtils.getRandomInt(orderIds.size()));
    }

    if (orderId == null) {
      sendError(400, response);
      return;
    }

    final String _orderId = orderId;

    AsyncBucket asynchBucket = checkout();

    if (isUseLockServer()) {
      lockUtils.lockEntity(_orderId);
      logger.error("Acquired lock on order " + _orderId);
    }

    final CountDownLatch countDownLatch = new CountDownLatch(1);

    try {

      Observable<RawJsonDocument> readObservable = OrderQueryService.loadDocumentRawAsync(orderId, asynchBucket);

      long currentTime = System.currentTimeMillis();

      readObservable.subscribeOn(Schedulers.io()).timeout(appConfig.getCouchbaseSubscriberTimeoutMills(), TimeUnit.MILLISECONDS)

          .doOnError(new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {

              countDownLatch.countDown();

              if (!response.isCommitted()) {

                logResponseTimings("operation_read_for_update", "timeout", _orderId,
                    System.currentTimeMillis() - currentTime);

                sendError(400, response);
              }

              return;
            }
          }).first().subscribe(

              originalDoc ->

              {

                if (appConfig.isPauseBetweenUpdates()) {
                  try {
                    Thread.sleep(1000L * appConfig.getPausTimeSeconds());
                  } catch (InterruptedException e) {
                    logger.error("error " + e);
                  }
                }

                logResponseTimings("operation_read_for_update", "success", _orderId,
                    System.currentTimeMillis() - currentTime);

                Observable<RawJsonDocument> updatedDocObservable = OrderUpdateService.updateDocument(originalDoc,
                    originalDoc.cas());
                long updateStartTime = System.currentTimeMillis();

                updatedDocObservable.subscribeOn(Schedulers.io()).first().subscribe(

                    updatedDoc -> {

                      Observable<RawJsonDocument> updateOrderObservable = null;

                      updateOrderObservable = OrderUpdateService.updateOrderAsync(updatedDoc, asynchBucket);
                      updateOrderObservable
                          .timeout(appConfig.getCouchbaseSubscriberTimeoutMills(), TimeUnit.MILLISECONDS)
                          .doOnError(new Action1<Throwable>() {
                            @Override
                            public void call(Throwable throwable) {

                              countDownLatch.countDown();

                              if (isUseLockServer()) {
                                lockUtils.unlockEntity(_orderId);
                                logger.error("Released lock on order " + _orderId);
                              }

                              logger.error("Error: " + _orderId + " ", throwable);

                              if (!response.isCommitted()) {

                                logResponseTimings("operation_write_for_update", "failure", _orderId,
                                    System.currentTimeMillis() - updateStartTime);

                                sendError(400, response);
                              }
                              return;

                            }
                          }).subscribe(s -> {

                            countDownLatch.countDown();

                            if (isUseLockServer()) {
                              lockUtils.unlockEntity(_orderId);
                              logger.error("Released lock on order " + _orderId);
                            }

                            logResponseTimings("operation_write_for_update", "success", _orderId,
                                System.currentTimeMillis() - updateStartTime);
                            sendSuccess(response);
                            return;

                          }, e -> {

                            countDownLatch.countDown();

                            if (isUseLockServer()) {
                              lockUtils.unlockEntity(_orderId);
                              logger.error("Released lock on order " + _orderId);
                            }

                            logger.error("Error: ", e);

                            if (!response.isCommitted()) {

                              logResponseTimings("operation_write_for_update", "failure", _orderId,
                                  System.currentTimeMillis() - updateStartTime);

                              sendError(400, response);
                            }
                            return;

                          },

                              () -> {
                              }

                    );
                    });

              },

              error -> {

                countDownLatch.countDown();

                if (!response.isCommitted()) {

                  logger.error("Error: ", error);

                  logResponseTimings("operation_read_for_update", "failure", _orderId,
                      System.currentTimeMillis() - currentTime);

                  sendError(400, response);
                }
                return;

              });

      try {
        countDownLatch.await();
      } catch (InterruptedException e) {
        logger.error("Error " + e);
      }

    } catch (Exception e) {
      logger.error("Error: ", e);
    } finally {
      checkin(asynchBucket);
    }

  }
@dmots
Copy link
Author

@dmots dmots commented Jun 16, 2017

@jerrinot I meant not able to reproduce after adding 'synchronized' keyword

@mmedenjak mmedenjak changed the title Multiple threads from Spring boot application able to acquire lock on same document id ( String ) [lock] Multiple threads from Spring boot application able to acquire lock on same document id ( String ) Jul 11, 2017
@mmedenjak
Copy link
Contributor

@mmedenjak mmedenjak commented Aug 1, 2017

@dmots are you still experiencing the issue? Can you reproduce it - by using JMeter or any other tool? You don't need to use synchronised to acquire the lock, the synchronisation should be done on the hazelcast member owning the lock.

This might be a silly observation is your test invoking acquireLock when it logs this?

1497342041174 Acquired lock on document '01ABC'
1497342041190 Acquired lock on document '01ABC'
1497342041194 Acquired lock on document '01ABC'

The issue then is that the acquire lock doesn't actually acquire the lock, it only gets the proxy with which you can actually acquire the lock by calling the lock method :

public synchronized ILock acquireLock(String lockKey) {
		ILock ilock = client.getLock(lockKey);
		return ilock;
	}
@mmedenjak
Copy link
Contributor

@mmedenjak mmedenjak commented Aug 1, 2017

By the way, the hazelcast lock is reentrant by the same thread on the same caller. So it might happen that one thread acquires the lock and offloads the processing of the document to a different thread. Then if the same thread (as per Thread.currentThread().getId()) tries to acquire the lock again it will succeed - because the same thread on the same caller tried to acquire the lock.
This might be the case with your code, since you have mentioned that you offload processing and unlocking to a different thread.

@dmots
Copy link
Author

@dmots dmots commented Aug 2, 2017

@mmedenjak
We re-analysed our code. We were calling Couchbase asynchronously using RxJava , so we had added countdown latch in Spring Controller method to make the client wait for the response. Looks like when this issue occurred , countdownLatch.await() line was missing from the code so hazelcast lock acquiring method was made synchronized. After that when we found out that await is misssing we added await. Now it works without making the method synchronized. Thanks for your help. Apologies , Looks like a goof up from our end. JMeter was sending hundreds of requests within milliseconds because of missing 'await' call. Client doesn't have to synchronize the method as mentioned by you correctly. I think you can close this issue.

@gurbuzali
Copy link
Member

@gurbuzali gurbuzali commented Aug 2, 2017

Thank you for your response, feel free to open a new issue if you encounter one

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Linked pull requests

Successfully merging a pull request may close this issue.

None yet
4 participants
You can’t perform that action at this time.