Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CAS operation example #40

Closed
chenbekor opened this issue Dec 18, 2014 · 11 comments
Closed

CAS operation example #40

chenbekor opened this issue Dec 18, 2014 · 11 comments

Comments

@chenbekor
Copy link

hi Mathieu,

can you please provide an example of how to use CAS operation with play and reactivecouchbase?

here is what i'm trying to achieve:

I have a few requests trying to update concurrently a single doc in couch.

what i would like to do is get a CAS during get operation, try to update the doc and if rejected retry using new CAS (retry for let's say 10 times after which a failure occurs)

would be nice also to have a exponential backoff for such updates.

Please note - I do not want to use the lock feature! just retry until succeeded.

@mathieuancelin
Copy link
Member

Hi Chen,

you can use atomicallyUpdate or atomicUpdate depending on what you need to do to update your document. What it does is it starts to lock the key, then call your callback with the actual document as parameter and expect the new value of the document in return (or a future of it for atomicallyUpdate), then unlock the key.

If the key is already locked, the driver will retry the operation 15 time before failing the future with 200 millisecond between each retry.

val driver = ReactiveCouchbaseDriver()
val bucket = driver.bucket("default")

bucket.atomicUpdate[JsObject]("myDocumentKey") { myDocument =>
  myDocument ++ Json.obj("newKey" -> 42) // returns doc with a new field
} onComplete {
  case _ => driver.shutdown()
}

So I guess it's pretty close of what you need. Maybe we can work together to make 15 times and 200 millisecond configurable and try to add some exponential backoff as retry strategy.

@chenbekor
Copy link
Author

yes I did try automicUpdate - but I'm consistently getting:

NOT_FOUND for key XXX
and failures on all atomic updates. Here is the thing - the doc (at first) is not in couch. the first thread will create the first revision. the second thread should also try to create for first try, and after that - retry, fetch and merge the two jsons.

in your code:

bucket.atomicUpdate[JsObject]("myDocumentKey") { myDocument =>

but I was expecting an Option[]JsObject] since the first thread should get None - right?

Maybe I'm missing something in the example?

Here is my complete code:

val updated_event:JsObject = //some json we just got
val res = bucket.atomicallyUpdate[JsObject](event_key, exp = 60 * 60 * 24 * 1) { old_event =>
  Future.successful(old_event ++ updated_event)
}
res.onComplete {
  case Success(s) => {
    Logger.debug(s"event $event_key was merged OK.")
  }
  case Failure(f) => {
    Logger.error(s"event $event_key failed to merged.",f)
  }
}

@mathieuancelin
Copy link
Member

As the name of the function states, it's an update so the key as to exist. So that's why the parameter is not an Option[T].

@chenbekor
Copy link
Author

I see. So how would you suggest to handle the first time creation ? this is exactly the race condition I'm facing here: two threads think they are first - but only one should win on the write while the other should retry and recover using a merge operation of the old and new jsons.

any idea?

@mathieuancelin
Copy link
Member

I will try to write some stuff and let you know.

@chenbekor
Copy link
Author

Mathieu, just as an inspiration - I recall from the pre-scala days that I did similar things using the low level spymemcached API - specifically the CASMutation was very handy. It expects two callbacks - one for first time element and the other for merge operations. the underlying infra knows what callback to call according to the current DB state (handling retries etc)

the below code is copied from https://code.google.com/p/spymemcached/wiki/Examples

I hope you can help us port this to reactivecoucebase / scala / play as this is currently a production issue we are experiencing. Again - thank you so much for the help.

public List<Item> addAnItem(final Item newItem) throws Exception {

    // This is how we modify a list when we find one in the cache.
    CASMutation<List<Item>> mutation = new CASMutation<List<Item>>() {

        // This is only invoked when a value actually exists.
        public List<Item> getNewValue(List<Item> current) {
            // Not strictly necessary if you specify the storage as
            // LinkedList (our initial value isn't), but I like to keep
            // things functional anyway, so I'm going to copy this list
            // first.
            LinkedList<Item> ll = new LinkedList<Item>(current);

            // If the list is already "full", pop one off the end.
            if(ll.size() > 10) {
                ll.removeLast();
            }
            // Add mine first.
            ll.addFirst(newItem);

            return ll;
        }

    };

    // The initial value -- only used when there's no list stored under
    // the key.
    List<Item> initialValue=Collections.singletonList(newItem);

    // The mutator who'll do all the low-level stuff.
    CASMutator<List<Item>> mutator = new CASMutator<List<Item>>(client, transcoder);

    // This returns whatever value was successfully stored within the
    // cache -- either the initial list as above, or a mutated existing
    // one
    return mutator.cas("myKey", initialValue, 0, mutation);
}

@chenbekor
Copy link
Author

hi Mathieu I was wondering if you had a chance to look at this concurrency issue?

@mathieuancelin
Copy link
Member

Hi Chen,

no time for that yet, I'll look at it as soon as I'm in vacation.

@chenbekor
Copy link
Author

thanks Mathieu for the update. we managed to port the Java code CASMutator but it is ugly and not inline with the non-blocking nature of what reactivecouchbase is all about. we are looking forward for your update on this one. Again - big thank you for your continues support and for reactivecouchbase.

@chenbekor
Copy link
Author

hi Mathieu, I hope you had a nice year end vacation! I was wondering if you can find time to look into this issue as this is still an ongoing concurrent update bug that we are facing. Any help is appreciated.

@chenbekor
Copy link
Author

the issue is resolved. it was an application bug. the above example works.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants