Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to use batch.size? #290

Open
dipdeb opened this issue Sep 25, 2017 · 34 comments
Open

How to use batch.size? #290

dipdeb opened this issue Sep 25, 2017 · 34 comments

Comments

@dipdeb
Copy link

dipdeb commented Sep 25, 2017

Hi,
I want to read only 50 records in a batch through jdbc sink, for which I've used the batch.size in the jdbc sink config file:

name=jdbc-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
batch.size=50
topics=postgres_users

connection.url=jdbc:postgresql://localhost:34771/postgres?user=foo&password=bar
file=test.sink.txt
auto.create=true

But the batch.size has no effect as records are getting inserted into the database when new records are inserted into the source database.

How can I achieve to insert in a batch of 50?

@ewencp
Copy link
Contributor

ewencp commented Sep 25, 2017

batch.size is best effort for each set of records delivered to the sink connector:

Specifies how many records to attempt to batch together for insertion into the destination table, when possible.

@dipdeb
Copy link
Author

dipdeb commented Sep 26, 2017

@ewencp sorry I couldn't get your reply.

@sashati
Copy link

sashati commented Nov 13, 2017

I'm also struggling to increase Sink batch more than 500.
It seems batch.size is not for this and as @ewencp mentioned, it's to deliver data to sink connector. I highly appreciate any clue about customizing batch size of the connector.

@dipdeb
Copy link
Author

dipdeb commented Dec 1, 2017

@SAEEDSH instead of batching it on the sink side, I guess it can be done on the source connector side or you can try to increase the poll.interval.ms in source config.

@sashati
Copy link

sashati commented Dec 1, 2017

Thanks @dipdeb. But my problem is when I have data in Kafka and need to Sink them. For example, when I have a million records in Kafka and run JDBC Sink connector, it sends to DB in batches, 500 each, which takes quite time. I don't know how to increase number of records go to DB.

@rhauch
Copy link
Member

rhauch commented Dec 4, 2017

The Connect worker consumes the messages from the topics, and the consumer's max.poll.records specifies the maximum number of records that will be returned by a single poll. The connector's batch.size can really never be larger than this value, since that's the maximum number of records that will be processed at one time. Try changing that consumer property for your connector.

@atman9
Copy link

atman9 commented Mar 13, 2018

I am hitting this issue as well. Even with max.poll.records set as high as 4000, I can't get more than about 80-100 rows per batch.

I have 8 partitions across 3 brokers. The max.poll.records is set to 4000, the tasks.max is set to 4 ( also tried 8 ) and batch.size is set to 1000 ( also tried 500 ).

I turned on DEBUG to see if the logs turned up anything and here is an example:

[2018-03-13 15:09:59,807] DEBUG [Consumer clientId=consumer-23, groupId=connect-oracle-jdbc-sink-steam-1] Fetch READ_UNCOMMITTED at offset 2073259 for partition dmg.inode.steam.1-
7 returned fetch data (error=NONE, highWaterMark=2073327, lastStableOffset = -1, logStartOffset = 673821, abortedTransactions = null, recordsSizeInBytes=22525) (org.apache.kafka.c
lients.consumer.internals.Fetcher)
[2018-03-13 15:09:59,808] DEBUG [Consumer clientId=consumer-23, groupId=connect-oracle-jdbc-sink-steam-1] Added READ_UNCOMMITTED fetch request for partition dmg.inode.steam.1-7 at
 offset 2073327 to node kafka-dmg-test-01.pixar.com:9092 (id: 0 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher)
[2018-03-13 15:09:59,808] DEBUG [Consumer clientId=consumer-23, groupId=connect-oracle-jdbc-sink-steam-1] Sending READ_UNCOMMITTED fetch for partitions [dmg.inode.steam.1-7] to br
oker kafka-dmg-test-01.pixar.com:9092 (id: 0 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher)

Any help would be appreciated. These small batch sizes are causing the destination DB ( oracle ) to commit too frequently and is causing waits and IO contention.

@atman9
Copy link

atman9 commented Mar 15, 2018

I was able to get greater throughput by adjusting the following properties in the connect-distributed.properties:

# based on 500*180byte message size
consumer.fetch.min.bytes=900000
consumer.fetch.wait.max.ms=1000

@emasatsugu
Copy link

Did the above work for anyone else? I still am stuck at 500 records per batch

@sashati
Copy link

sashati commented Mar 13, 2019

No. It seems 500 is fixed. I couldn't increase it as well

@imranrajjad
Copy link

interesting discussion, I would like to add to this discussion that what if batch size is set to 500 and the topic has 499 records, and it takes an hour for 500th record to be inserted into topic, will the Sink connector halt completely for that one hour? is there a maximum wait limit property for Jdbc-Sinks?

@jukops
Copy link

jukops commented Jul 15, 2019

I have same problem. I changed batch.size, max.poll.records, consumer.fetch.min.bytes and consumer.fetch.wait.max.ms. but looks still update 500 rows. It's not changed. Is there any solutions to fix that?
I used below stack.
connect = confluent-kafka-connect-jdbc/5.2.2-1 version.
Target DB = postgresql
Kafka = 2.1.0

@cameronbraid
Copy link

I am having the same issue, and looking at the code the buffer is flushed once per SinkTask.put method call, so the limiting factor is the number of messages consumed from kafka at a time.

@surajshinde427
Copy link

@sashati have you got any solution?

@MashaFomina
Copy link

@jukops Hello. I have the same problem, but my records are saved by one, not in batches (https://stackoverflow.com/questions/59049762/kafka-jdbc-sink-connector-insert-values-in-batches). I tried to use this options:

batch.size=500

# based on 500*3000 byte message size
consumer.fetch.min.bytes=1500000
consumer.fetch.wait.max.ms=1500
consumer.max.poll.records=4000

This options do not give any results for me (records always saved by one). How did you achieve saving in batches?

@bmoe24x
Copy link

bmoe24x commented Apr 15, 2020

The Connect worker consumes the messages from the topics, and the consumer's max.poll.records specifies the maximum number of records that will be returned by a single poll. The connector's batch.size can really never be larger than this value, since that's the maximum number of records that will be processed at one time. Try changing that consumer property for your connector.

@rhauch, do you know if this behavior is still correct? If so, would it be possible to update the docs to mention that 'batch.size' is essentially tightly coupled to 'max.poll.records' ?

@unfrgivn
Copy link

@MashaFomina Did you ever find a workaround here? As it stands, we're stuck using a custom consumer to perform this sink job. I still find it hard to believe this would only batch by creating 1000s of individual insert statements as this completely neuters what databases are amazingly efficient at. I don't have much Postgres experience, but for MySQL this would wreak havoc on the binlog and replication.

I suppose Kafka would not be aware of the exact offset in the batch that fails the insert but there will always be unaccounted for issues with garbage in. Or maybe some of the database flavors this connector supports have compatibility issues here or fail multiple inserts differently? The only thing I know for certain is that for MS SQL, MySql and Oracle this is a huge performance deal breaker.

@MashaFomina
Copy link

MashaFomina commented Apr 19, 2020

@unfrgivn I did not find proper decision, as i finally understood that records are really polled from topic by batches, but are inserted in database in transaction that contain batch of inserts by one record. I investigated speed of writing to databse. You can try to write you own consumer of data to insert records in database.

@unfrgivn
Copy link

@MashaFomina yea I rewrote this connector with similar config as a consumer. Just asking because I'd love to use the Connect API the same way I do for my other source/sinks, but the performance is almost 30x worse with 1 insert per row, plus the massive increase in disk I/O for MySQL binlog transactions (assuming the most common setup using innodb with autocommit on).

@bmoe24x
Copy link

bmoe24x commented Apr 20, 2020

@jukops Hello. I have the same problem, but my records are saved by one, not in batches (https://stackoverflow.com/questions/59049762/kafka-jdbc-sink-connector-insert-values-in-batches). I tried to use this options:

batch.size=500

# based on 500*3000 byte message size
consumer.fetch.min.bytes=1500000
consumer.fetch.wait.max.ms=1500
consumer.max.poll.records=4000

This options do not give any results for me (records always saved by one). How did you achieve saving in batches?

Are you setting these properties from the worker side or from the connector (JSON) side? One thing that I don't think is documented well is on the connector (JSON) side you need use the prefix consumer.override. to set consumer properties, not just consumer. and all of this AFTER setting connector.client.config.override.policy=All or similar on the Worker side.

Regardless, I would be quite surprised if your records were actually being inserted one at a time. If you can't get over 500, make sure to follow the above. You can turn the root log level to DEBUG and search for "Flushing" as it will tell you how many records it is flushing at a time.

@dconger
Copy link

dconger commented Jun 5, 2020

for MySQL this would wreak havoc on the binlog and replication

@unfrgivn I'd love to see the tweaks you made to the sink connector to support batch inserts. My team is running into this exact issue, and we seem to have no choice but to fork the repo and write our own custom implementation of the JDBCSinkTask

@unfrgivn
Copy link

unfrgivn commented Jun 5, 2020

@dconger we are using a nodejs implementation of a Kafka consumer to replicate the sink functionality so it's not a fork nor even a Kafka Connector

@ugurcancetin
Copy link

ugurcancetin commented Jun 30, 2020

@ewencp Does it mean that we should create custom sink connector with a custom adjusted poll method that runs on bigger batches? Just to clarify if there is no solution for this somehow yet or it's in the road map for near future.

@cobolbaby
Copy link

Is there any solution?

@AndreaJulianos
Copy link

so, Do we have any authoritative Kafka connect source and sink test reports? such as mysql to kafka...
And we can see what records per sec is normal

@sergeykranga
Copy link

I've no experience with JDBC connectors, but assuming that this is generic configuration for all kinds of connectors, and based on this stackoverflow answer, did you try adding consumer.max.poll.records property to the worker configuration?

I was struggling with this while working with S3 sink connectors, where it was always configuring the max.poll.records to 500 by default. The property above fixed the issue for me.

@cstmgl
Copy link

cstmgl commented Jan 11, 2021

I've no experience with JDBC connectors, but assuming that this is generic configuration for all kinds of connectors, and based on this stackoverflow answer, did you try adding consumer.max.poll.records property to the worker configuration?

I was struggling with this while working with S3 sink connectors, where it was always configuring the max.poll.records to 500 by default. The property above fixed the issue for me.

Sorry I'm also very interested in this topic but I have a question, this consumer.max.poll.records is this a Kafka configuration or would this be defined at each connector level?

I'm asking and interested in this topic because I noticed that my connectors are waiting for days (I'm in a development setup and I don't have massive load on my system) for the data to be pushed from Kafka to the sink systems (both jdbc and s3)
I was expecting a delay of a couple of minutes (5, 10, 20?) but not days so I'm a bit lost how can I enforce the data to be sinked without needing to make the flush size 1

@marcelrend
Copy link

I was able to increase the batch size behavior. As indicated above, Kafka Connect needs to enable connector.client.config.override.policy=All and the connector needs to use settings: batch.size: 4000, consumer.override.max.poll.records: 4000.

I was expecting a performance increase, but it stayed around 4k messages per second. When I increased the amount of connector tasks though, I was able to get over 20k messages per second.

@ykcai
Copy link

ykcai commented May 12, 2021

@MrMarshall was was your setup like, # of topic partitions? # of tasks for the connector?

@marcelrend
Copy link

marcelrend commented May 12, 2021

@MrMarshall was was your setup like, # of topic partitions? # of tasks for the connector?

@ykcai I ran 3 brokers on the same machine, the topic had 6 partitions I believe. I ran multiple tests with a varying number of connector tasks and batch size. I checked the amount of records in postgres every 5 seconds and plotted the added amount of records in the graph below. Each line shows numberoftasks_batchsize, e.g. the brown line has 10 tasks and 10k batch size. I didn't even include running only 1 tasks because it took about 3x longer and that messed up the graph.

My conclusion was that batch size does not significantly affect the performance, but the number of tasks does. When using 6 partitions it doesn't help much to use 5 tasks because one of the tasks will need to take care of 2 partitions, so in that case it's better to use 2, 3 or 6 tasks.

2021-05-12 08_20_23-performance_test - Excel

@bmoe24x
Copy link

bmoe24x commented May 12, 2021

@MrMarshall just an FYI total # of tasks > # of topic partitions is unnecessary. Did you turn on trace logs and see if you were actually getting batches of 2000, 3000, 10000 etc? Very often it is much less than people expect if everything isnt configured just right. There are several parameters that need to be set such as fetching limits and byte limits

It is to be expected that more tasks will be more important than batch size up to the number of partitions in the topic also

@marcelrend
Copy link

Yes I saw the count go up only by the batch size (or a multitude of the batch size). I didn't check the DB logs though

@bmoe24x
Copy link

bmoe24x commented May 13, 2021

Yes I saw the count go up only by the batch size (or a multitude of the batch size). I didn't check the DB logs though

In my experience, I have come to many of the same conclusions as you. If you want to do further testing I would do all with 3 tasks or all with 6 tasks. As you mentioned, trying to get an even assignment of partitions to tasks is typically ideal. I would skip any testing with more tasks than you have partitions as those will not be assigned any work. Off the top of my head my best performance was with batch sizes in the few thousands, diminishing returns on either side of that. If you want to be 100% sure of what is going on I would turn on trace logs for a small workload and search in it to make sure it isnt slowly building up to your batch limit, it is advantageous within reason to have a similar amount of data being read in from my experience.

@demobin8
Copy link

So, the solution is:
First, enable work configuration override by add docker env:
CONNECT_CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY: All
Then, override customer configuration in connect configuration:
"consumer.override.max.poll.records": 1000

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests