-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add batch synchronisation #19
Conversation
public AsyncBatchLimitUsageStorage( | ||
LimitUsageStorage wrappedLimitUsageStorage, | ||
int timeBetweenSynchronisations, | ||
int delayBeforeFirstSync) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I see a real usefulness for this param - why not just use timeBetweenSynchronization
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needed for tests :(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
package protected then ?
@@ -146,6 +148,10 @@ public void asyncPerformance() { | |||
lastResponse = | |||
asyncStorage.incrementAndGet(RESOURCE1, LIMIT1, PROPERTY1, EXPIRATION, TIMESTAMP); | |||
} | |||
asyncStorage.shutdownStorage(); | |||
while (!asyncStorage.isTerminated()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should provide a method that blocks until terminated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I just wasn't sure of the implementation. Is a Thread sleep of like 10ms between checks good enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well it's a start, but have a look at awaitTermination
on ExecutorService
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes thats true, forgot about that
.withCost(cost) | ||
.build(); | ||
|
||
Pair<LimitKey, Integer> reponse = storage.addAndGet(request); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Until there is something I got wrong you're sending the events using separate requests here right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup. Why not send them in one go through the other overload?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think its possible since we iterate over the map and we sync each value one by one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that pipelining the queries would be better, but I don't really know how to do that while iterating over the map. We have to remember that it is less critical since its an async job and we don't have that many entries.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest to go with that first version for now. It's already a good improvement over the previous version. Further optimizations can be done in a separate issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
valueEntry.getValue().setTotal(reponse.getValue()); | ||
}); | ||
} catch (IllegalStateException e) { | ||
logger.warn("Key was deleted during synchronisation, ignoring"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we catch this particular error in a more precise location?
Also, maybe provide a catch all handler that logs an error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean by a more precise location? It makes no sens to catch it inside the inner forEach since if the key is deleted, all values in the inner map will be deleted too.
This exception should not be thrown by Oracle JVM anyway, I included it since the javadoc says that some implement might throw it.
Yes I will include a catch for runtime exception, maybe surrounding the whole function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed that for a catch Exception and log the error.
* @author Emile Fugulin | ||
* @since 1.0.0 | ||
*/ | ||
public class CacheSynchronisation extends TimerTask { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pretty sure that Synchronization should take a Z
|
||
@Override | ||
public void run() { | ||
cache.applyOnEach( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would expect that we'd somehow clear the cache after we flush it's content downstream, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what you mean by that...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will add a removeExpiredEntry after, good point
@@ -78,6 +78,14 @@ public AsyncLimitUsageStorage(LimitUsageStorage wrappedLimitUsageStorage) { | |||
return wrappedLimitUsageStorage.debugCurrentLimitCounters(); | |||
} | |||
|
|||
public void shutdownStorage() { | |||
executorService.shutdown(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No guarantee that we flush the remaining data to the underlying storage?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not necessary since the keys will expire by themselves
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well yes, but it does mean that those events will never get counted. Probably not a problem but still something that could be better easily no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ho I see what you mean. Well the shutdown isn't meant to be called during the life of the async storage. Its something that I would have put in a destructor, but Java doesn't have one...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why shouldn't it be called? Makes sense to do it (maybe indirectly?) when shutting down the app no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes you have to do it when you shutdown the app, but I don't see your point of counting the events on the remote storage if you are shutting down everything :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might be shutting down a single server in a cluster that has 10s of them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well the shutdown method is only to stop gracefully the sync between the local and the remote storage, it won't actually stop the remote storage
I also replied to some comments marked as outdated because I renamed the class, but I would still like your feedback @malaporte |
If you want to take a look too @GuiSim |
public AsyncBatchLimitUsageStorage( | ||
LimitUsageStorage wrappedLimitUsageStorage, | ||
int timeBetweenSynchronisations, | ||
int delayBeforeFirstSync) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
package protected then ?
private Timer timer; | ||
|
||
public AsyncBatchLimitUsageStorage( | ||
LimitUsageStorage wrappedLimitUsageStorage, int timeBetweenSynchronisations) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Duration instead of a int ? or we should specify the unit somehow...
|
||
@Override | ||
public Map<LimitKey, Integer> addAndGet(Collection<AddAndGetRequest> requests) { | ||
Map<LimitKey, Integer> cachedEntries = cache.addAndGet(requests); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return directly
valueEntry.getValue().setTotal(reponse.getValue()); | ||
}); | ||
} catch (IllegalStateException e) { | ||
logger.warn("Key was deleted during synchronisation, ignoring"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"synchronization"
add a "." at the end
|
||
Pair<LimitKey, Integer> reponse = storage.addAndGet(request); | ||
|
||
valueEntry.getValue().addAndGet(-cost); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we add a decrementAndGet method ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could, but under it it will still be an addAndGet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not a decrementAndGet under also ?
If everybody approve, i will merge and create a V1 published on maven central. |
This is a first implementation of a new async storage that sync in batch. Suggestions and critiques are welcome.
This storage was necessary since we just can't make a query to our database (even async) for each query that arrives, that does not scale.