-
Notifications
You must be signed in to change notification settings - Fork 28
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add an S3 processor agent #755
Conversation
This agent will read a single file from an S3 bucket. The name of the file to read is taken from the record.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
||
// Read the file content from the response | ||
byte[] fileContent = getObjectResponse.readAllBytes(); | ||
log.debug("File content: {}", new String(fileContent)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this conversion is better to be done only if debug is enabled
if (log.isDebugEnabled) {log.debug...}
|
||
final JsonRecord jsonRecord = mutableRecord.toJsonRecord(); | ||
|
||
log.debug("Processing record {}", jsonRecord.toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (log.isDebugEnabled) {log.debug...}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM
but I have left some minor feedback.
I think we are on our way, this agent will be very useful
Looking forward to a PR to the docs website after we commit this patch
Thanks
|
||
String fileName = objectTemplate.execute(jsonRecord); | ||
|
||
log.info("Processing file {}", fileName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should define what happens in case fileName is null or empty, maybe we could "skip" the record ? (emit an empty list)
return Map.of("bucketName", bucketName); | ||
} | ||
|
||
private static class S3SourceRecord implements Record { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could:
- rename this to S3ProcessorRecord
- use the same class of the S3Source
- switch to SimpleRecord
String username = configuration.getOrDefault("access-key", "minioadmin").toString(); | ||
String password = configuration.getOrDefault("secret-key", "minioadmin").toString(); | ||
String region = configuration.getOrDefault("region", "").toString(); | ||
String objectName = configuration.getOrDefault("objectName", "").toString(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about providing a sensible default, like "{{value.objectname}}" ?
ths way people don't have to bother with mustache for simple cases
throw new IllegalArgumentException( | ||
"Executor " + executorId + " is not of type 'service'."); | ||
} | ||
// Removing this check so that we can connect to any agent. The agent |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we discuss this change in another PR ?
@@ -66,7 +66,7 @@ fastavro = "^1.9.2" | |||
optional = true | |||
|
|||
[tool.poetry.group.full.dependencies] | |||
openai = {extras = ["datalib"], version = "^1.6.1"} | |||
openai = {extras = ["datalib"], version = "^1.10.0"} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems unrelated
… namespace (#13) Also small modification to S3 processor test to cover special characters in bucket object names.
…es for unit tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that this patch is accumulating changes not strictly related to the S3 Processor.
We can keep them but I would prefer to finalise the patch in this form and then follow up with other improvements
try { | ||
return Integer.parseInt(stringValue); | ||
} catch (NumberFormatException e) { | ||
System.err.println("Error parsing integer: " + e.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logger ?
…of messages is flight and better constrain the memory usage of agents
…gged images to ECR
…dereference issue
@cdbartholomew I suggest to closed this PR, this is more a diff between this repo and the main branch of another repo. I would prefer to merge your changes using fewer smaller patches instead of this big PR. I will be very happy to review the single patches |
… received, which makes sense since the clients are async
…ion of the producer
Closing for now. Will update an open a new PR. |
This agent will read a single file from an S3 bucket. The name of the file to read is taken from the record.