-
Notifications
You must be signed in to change notification settings - Fork 28
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
API gateway: cache topic producer for the same gateway #660
Conversation
5f38cd8
to
86f589b
Compare
...tream-api-gateway/src/main/java/ai/langstream/apigateway/gateways/LRUTopicProducerCache.java
Outdated
Show resolved
Hide resolved
try { | ||
final SharedTopicProducer sharedTopicProducer = | ||
cache.get(key, () -> new SharedTopicProducer(topicProducerSupplier.get())); | ||
sharedTopicProducer.acquire(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a common pitfall
the "acquire" should be performed in the constructor, because between this line and the previous line it is possible that the producer has been released if the count is zero (t is an edge case, but it usually happens).
The tricky thing is that you have to call acquire if there is already an object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also the producer must be started only once, so you have to start it when you have created it
and the cache should not return the producer until it is fully initialised (started) with success
and if the start operation fails it must not be added to the cache
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the producer is started in during the load in the cache and every producer in the cache is already started.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the acquire must take into condiseration the edge case you mentioned, right. I did it in another manner. When you call acquire, if the producer has been actually closed (removed from cache for example), we retry and that will trigger the constructor to be called
topicConnectionsRuntime.createProducer( | ||
null, streamingCluster, Map.of("topic", topic)); | ||
producer.start(); | ||
topicProducer.start(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not to be executed here, see the comments above
public interface TopicProducerCache { | ||
record Key(String tenant, String application, String gatewayId) {} | ||
|
||
TopicProducer getOrCreate(Key key, Supplier<TopicProducer> topicProducerSupplier); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this operation may fail, it is better to add a "throws" clause. The risk is to forget possible failures (that will happen for instance if the broker is down)
if (topicProperties.isProducersCacheEnabled()) { | ||
return new LRUTopicProducerCache(topicProperties.getProducersCacheSize()); | ||
} else { | ||
return (key, topicProducerSupplier) -> topicProducerSupplier.get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is tricker then expected, see the comments above
the producer that is returned should be already ready to accept writes (started)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes it is
|
||
application.topics.producers-cache-enabled=true | ||
application.topics.producers-cache-size=100 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should have metrics for this, otherwise it is hard to understand if the system is close to the threshold.
when you hit the threshold we stop caching the system will have very bad performances (latency)
a8e89fc
to
b131418
Compare
Changes: