Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Duplication Issue: Flume Kafka Data Transfer from Local to Cloud Kafka #2

Open
SamBhandary opened this issue Dec 6, 2023 · 0 comments

Comments

@SamBhandary
Copy link

I employ Flume Kafka and a Flume channel to transfer data from a local Kafka instance to a remote Kafka setup. Occasionally, I've observed instances where Flume duplicates messages upon reaching the cloud-based Kafka destination.

Here's how my configuration looks like

Define sources, channels, and sinks

agent.sources = kafka-source
agent.channels = kafka-channel

agent.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.kafka-source.channels = kafka-channel
agent.sources.kafka-source.setTopicHeader = false
agent.sources.kafka-source.kafka.topics = test
agent.sources.kafka-source.kafka.bootstrap.servers = localhost:9092
agent.sources.kafka-source.kafka.consumer.group.id = test-group
agent.sources.kafka-source.kafka.consumer.auto.offset.reset = earliest
agent.sources.kafka-source.kafka.consumer.enable.auto.commit= true

agent.sources.kafka-source.interceptors = doSomethingInterceptor
agent.sources.kafka-source.interceptors.doSomethingInterceptor.type = com.sample.DoSomething$Builder

agent.channels.kafka-channel.type = org.apache.flume.channel.kafka.KafkaChannel
agent.channels.kafka-channel.parseAsFlumeEvent = false
agent.channels.kafka-channel.kafka.topic = cloud_test
agent.channels.kafka-channel.kafka.bootstrap.servers = cloud_kafka_bootstrap_server:9092
agent.channels.kafka-channel.kafka.consumer.group.id = cloud_test_group
agent.channels.kafka-channel.kafka.producer.acks = 1
agent.channels.kafka-channel.kafka.producer.batch.size = 65536
agent.channels.kafka-channel.kafka.producer.client.id = cloud_kafka_client_id
agent.channels.kafka-channel.kafka.producer.compression.type = some_compression_type
agent.channels.kafka-channel.kafka.producer.linger.ms = 50
agent.channels.kafka-channel.kafka.producer.max.request.size = 33554432
agent.channels.kafka-channel.kafka.producer.security.protocol = SSL
agent.channels.kafka-channel.kafka.producer.ssl.endpoint.identification.algorithm =
agent.channels.kafka-channel.kafka.producer.ssl.enabled.protocols =
agent.channels.kafka-channel.kafka.producer.ssl.keystore.location =
agent.channels.kafka-channel.kafka.producer.ssl.keystore.password =
agent.channels.kafka-channel.kafka.producer.ssl.keystore.type =
agent.channels.kafka-channel.kafka.producer.ssl.truststore.location =
agent.channels.kafka-channel.kafka.producer.ssl.truststore.password =
agent.channels.kafka-channel.kafka.producer.ssl.truststore.type =

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant