-
Notifications
You must be signed in to change notification settings - Fork 28
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add gRPC processor agent #406
Conversation
private final Map<Object, Integer> schemaIds = new ConcurrentHashMap<>(); | ||
|
||
// Schemas received from the server | ||
private final Map<Integer, Object> serverSchemas = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An untrusted remote server could OOM us by sending a big amount of schemas.
I don't think this is an issue atm since the remote application is owned by the user.
But in the future we may have to put a limit.
} else { | ||
// TODO: received unknown schema. error ? | ||
// Do we send an error result or do we fail completely so that the agent is | ||
// restarted ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What should we do here ?
Ignore ? Send error for the source record ? Fail the producer completely at the next process call ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fail completely
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK.
Also currently I restart the request when the server errors.
But maybe it would be better to also fail completely to have the pod completely restarted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
05cfadb
to
888eec5
Compare
private RecordSink sink; | ||
|
||
// For each record sent, we increment the recordId | ||
private final AtomicInteger recordId = new AtomicInteger(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably we need AtomicLong here, it is possible to reach 2 billion messages for a production system that is processing data
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Of course! I use long everywhere for record id (protocol, caches) except here... Good catch !
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that it wouldn't have been a real issue since record ids are transient and forgotten once we get the server response. I believe there would be an OOM before reaching the MAX_VALUE if they are not removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
d5d728f
to
21be05a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have left some final feedback.
Overall LGTM
@@ -269,6 +269,13 @@ | |||
<scope>provided</scope> | |||
</dependency> | |||
|
|||
<dependency> | |||
<groupId>${project.groupId}</groupId> | |||
<artifactId>langstream-agent-grpc</artifactId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not needed here, please remove it
@@ -43,4 +43,9 @@ public interface AgentContext { | |||
default BadRecordHandler getBadRecordHandler() { | |||
return DEFAULT_BAD_RECORD_HANDLER; | |||
} | |||
|
|||
default void criticalFailure(Throwable error) { | |||
System.err.printf("Critical failure: %s. Shutting down the runtime...", error.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for the default implementation we can simply log, this way the tests won't crash the JVM
"Critical failure: %s. Shutting down the runtime..." | ||
.formatted(error.getMessage()), | ||
error); | ||
AgentContext.super.criticalFailure(error); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can make the behaviour configurable with a static "Consumer" variable, and in AbstractApplicationRunner (that is used in unit tests) we intercept the call without crashing the JVM
* Add gRPC processor agent * Fail the JVM in case of error from the remote server * Add a GrpcAgentsProvider in k8s runtime * Rename to experimental-python-processor * Remove grpc agent from runtime deps * By default, don't crash in AgentContext's criticalFailure method
For preliminary review.
Still need to: