Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

In S3 source connector, how to fetch only new files ? #311

Closed
akhileshacc opened this issue Jun 30, 2020 · 13 comments
Closed

In S3 source connector, how to fetch only new files ? #311

akhileshacc opened this issue Jun 30, 2020 · 13 comments

Comments

@akhileshacc
Copy link

I am using S3 source connector tp fetch files from S3. It works fine and gets back all records.
But on adding new files, it again gives all reacords.

Here is my configuration

{
    "name": "s3-source",
    "config": {
        "name": "s3-source",
        "connector.class": "org.apache.camel.kafkaconnector.awss3.CamelAwss3SourceConnector",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.camel.kafkaconnector.awss3.converters.S3ObjectConverter",
        "topics": "mytopic",
        "camel.source.path.bucketNameOrArn": "checkoutkafka",
        "camel.component.aws-s3.accessKey": "accesskey",
        "camel.component.aws-s3.secretKey": "secretkey",
        "camel.component.aws-s3.region": "AP_SOUTH_1",
        "camel.component.aws-s3.deleteAfterRead": false,
        "camel.component.aws-s3.autocloseBody": false
    }
}

How do i set this configuration ?

@oscerd
Copy link
Contributor

oscerd commented Jun 30, 2020

You need to set the deleteAfterRead to true and the file will be deleted once consumed.

@oscerd
Copy link
Contributor

oscerd commented Jun 30, 2020

Explained.

@oscerd oscerd closed this as completed Jun 30, 2020
@akhileshacc
Copy link
Author

What if I don't want to remove content from s3 ?

@oscerd
Copy link
Contributor

oscerd commented Jun 30, 2020

You can use the aws2-s3 component and use the moveAfterRead option

@oscerd
Copy link
Contributor

oscerd commented Jun 30, 2020

I'll add an example in camel-kafka-connector-examples repository

@akhileshacc
Copy link
Author

Thanks for the help. Using moveAfterRead expects a destination bucket. What if i don't want to use new bucket.
Is it possible currently ?

@oscerd
Copy link
Contributor

oscerd commented Jul 1, 2020

No, it's not. The connector is generated starting from the camel component, so you need to move the file somewhere else, actually only a different bucket is supported.

@akhileshacc
Copy link
Author

Do we plan to add this functionality down the line ?

@oscerd
Copy link
Contributor

oscerd commented Jul 1, 2020

No, we don't. It doesn't make sense. If you use Camel, you can add an idempotent repository to the consumer, so you would be able to consume each file just one time and at the same time we cannot change the Camel component behavior for a particular use case. Moving stuff to a different bucket it's already a good tradeoff.

@akhileshacc
Copy link
Author

akhileshacc commented Jul 1, 2020

Got it thanks.

you can add an idempotent repository to the consumer

What do you mean by this ?

@oscerd
Copy link
Contributor

oscerd commented Jul 1, 2020

In Apache Camel you can use https://camel.apache.org/components/latest/eips/idempotentConsumer-eip.html

So if you really need to avoid a situation where you have to move the files on a different bucket, you can avoid using the camel-aws-s3 connector and pass your application to a pure camel route, by using the camel-aws-s3 component in combination with the camel-kafka component.

@akhileshacc
Copy link
Author

Sure, trying it out.

A question regarding idempotent.
Will messages coming from s3 to kafka be filtered after they reach kafka-connect ?
If yes then won't we be having traffic overhead as files data will continue to move only not get added to topic ?

or this mechanism filters file list and sends only new one ?

@oscerd
Copy link
Contributor

oscerd commented Jul 1, 2020

No, they will be filtered before reaching kafka. If you use the idempotentRepository don't forget to set the deleteAfterRead=false, so the file won't be deleted

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants