Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The committed block count cannot exceed the maximum limit of 50,000 blocks. #324

Closed
nidhijwt opened this issue Jul 20, 2020 · 19 comments
Closed

Comments

@nidhijwt
Copy link

Overview

I am using CamelAzurestorageblobSinkConnector to sink to Archive data on my Kafka topics.

The problem

Data that I am sending, goes in Appendblob to Azure blob. What it does is, it creates another block in end every time and adds the block to an azure append blob. Azure gives a maximum limit of 50,000 blocks. Thus after 50,000 records data gets blob gets filled up and this happens within a few minutes. Append blob offers block size is upto 4 MB, which in my case is not being used fully as it saves only one record per block. The size of messages is really small (say 1 kb each). What happens is after 50,000 records my Append blob get full and I get an error saying

com.azure.storage.blob.models.BlobStorageException: Status code 409, "BlockCountExceedsLimitThe committed block count cannot exceed the maximum limit of 50,000 blocks.\nRequestId:b112f700-301e-0022-476f-5ea523000000\nTime:2020-07-20T08:23:40.1508526Z"\n\tat sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)\n\tat sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)\n\tat

Ask
I do not see a feature where I can buffer a few records and then insert using a connector. Please suggest if any such feature exists as this seems basic thing when archiving using connectors.

I such a thing does not exist what work around other people use?

@oscerd
Copy link
Contributor

oscerd commented Jul 20, 2020

I think this is more related on how the component azure blob works, @omarsmak can you add some hint on this?

@oscerd
Copy link
Contributor

oscerd commented Jul 21, 2020

By the way you could also use an aggregationStrategy for this, I'm preparing some documentation on how to do that and it will be available in the next release.

@omarsmak
Copy link
Member

omarsmak commented Jul 21, 2020

@nidhijwt the issue here as I understood, is the amount of records being sent, for example for every kafka record, it will be an equivalent azure append record which may hit the 50,000 limit pretty fast, especially if you have noise records which not meant to be inserted (you can ignore these with SMTs). However, the way I see it, you will need somehow to aggregate these records and insert them as few batches as possible as you mentioned. From the top of my head, you have these options:

  1. Write an aggregator using for example Kafka Streams that will aggregate the messages on rolling based into an output Kafka topic that has the aggregated records. And then, you can sink this output kafka topic using CamelAzurestorageblobSinkConnector.

  2. Use Camel aggregationStrategy which @oscerd mentioned that he will add some documentation about it in the next release which I guess it could be more practical option as you don't need to maintain an extra application like Kafka Streams to handle these aggregations.

@nidhijwt
Copy link
Author

Thanks for your reply.

Is the only thing pending is documentation or code is also not complete? I can see there are 2 aggregationStrategys present in code already StringAggregator and SimpleAggregator. I want to use them but couldn't find the way. I am using Camel connector with Kafka and in distributed mode with following properties

{ "name": "CamelAzure-storage", "config": { "connector.class": "org.apache.camel.kafkaconnector.azurestorageblob.CamelAzurestorageblobSinkConnector", "tasks.max": "1", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "topics": "my-topic", "camel.sink.url": "azure-storage-blob://myurl&accessKey=mykey&operation=commitAppendBlob" } }

Now I am trying to find what property can be used to specify aggregator. Even StringAggregator will serve my purpose

@oscerd
Copy link
Contributor

oscerd commented Jul 21, 2020

Try by adding

"camel.bean.aggregate": "#class:org.apache.camel.kafkaconnector.utils.SampleAggregator"
"camel.beans.aggregation.size": "1000"
"camel.beans.aggregation.timeout": "5000L"

Where the size is the batch size before send to Azure and the timeout is the timeout needed in case you don't have enough records to complete the aggregation.

https://github.com/apache/camel-kafka-connector/blob/master/core/src/test/java/org/apache/camel/kafkaconnector/utils/SampleAggregator.java

This will concatenate the record with a space in between.

You'll need to build the master and use the generated connector. This is not released yet.

@nidhijwt
Copy link
Author

@oscerd Thanks for your answer,

I built the latest code as suggested but this is not helping in aggregating data. Even with this, it is writing 1 record per block. Looks like inserting a collection is not supported with Azure blob.

@oscerd
Copy link
Contributor

oscerd commented Jul 22, 2020

You're doing something wrong.

@oscerd
Copy link
Contributor

oscerd commented Jul 22, 2020

It cannot be possible, if data are sent continuosly you should at least see more concatenated record in the block, not 1 record. So I don't know how you build and what you did, but I don't think you're using the latest version, also, please use the correct properties and not the camel.sink.url with the whole endpoint. You need to use the other approach of separated options provided, to make the camel.beans stuff works, I believe.

@omarsmak
Copy link
Member

@oscerd
Copy link
Contributor

oscerd commented Jul 22, 2020

Yes, my bad. Anyway I believe in this case avoiding using camel.sink.url will help. But the correct aggregator it's the one reported by @omarsmak
and that's really bad no errors were showed

@nidhijwt
Copy link
Author

nidhijwt commented Jul 23, 2020

Thanks, guys,

Another error in the above configuration I see is "camel.bean.aggregate" it should be "camel.beans.aggregate".

I did above as per suggestions but it is still not working. Then I checked logs and there I found following

INFO Using properties from: classpath:application.properties;optional=true (org.apache.camel.main.BaseMainSupport)
[2020-07-23 08:06:33,294] WARN Property not auto-configured: camel.beans.aggregation.timeout=5000L (org.apache.camel.main.BaseMainSupport)
[2020-07-23 08:06:33,294] WARN Property not auto-configured: camel.beans.aggregation.size=1000 (org.apache.camel.main.BaseMainSupport)
[2020-07-23 08:06:33,296] INFO Auto-configuration summary: (org.apache.camel.main.BaseMainSupport)
[2020-07-23 08:06:33,296] INFO  camel.component.azure-storage-blob.operation=commitAppendBlob (org.apache.camel.main.BaseMainSupport)
[2020-07-23 08:06:33,297] INFO No additional Camel XML routes discovered from: classpath:camel/*.xml (org.apache.camel.main.DefaultRoutesCollector)

The configuration I am using is

{
    "name": "CamelAzure-storage",
    "config": {
        "connector.class": "org.apache.camel.kafkaconnector.azurestorageblob.CamelAzurestorageblobSinkConnector",
        "tasks.max": "1",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
        "topics": "my-topic",
        "camel.sink.path.containerName": "storageaccount/container-name",
        "camel.sink.endpoint.blobName": "file.txt",
        "camel.sink.endpoint.accessKey": "my-access-key",
        "camel.component.azure-storage-blob.operation": "commitAppendBlob",
        "camel.beans.aggregate": "org.apache.camel.kafkaconnector.aggregator.StringAggregator",
        "camel.beans.aggregation.size": "1000",
        "camel.beans.aggregation.timeout": "5000L"
    }
}

Observation: It does not say Property not auto-configured: for camel.beans.aggregate

@oscerd
Copy link
Contributor

oscerd commented Jul 23, 2020

Can you try by adding a dynamic name to the file? In this way it will go in append. I still by the way don't get if this problem is of azure blob or ckc..

@nidhijwt
Copy link
Author

The Blob type I am using in Azure is AppendBlob so it does the append operation. One thing that Azure does not support in blobs is updating of blobs

image

that is why I need to send a collection of records which I believe is implemented below

image

@oscerd
Copy link
Contributor

oscerd commented Jul 23, 2020

I don't know a way to reproduce this, so I cannot really say nothing. The way to aggregate stuff and send a list of record is through the aggregation. There are actually no other ways to collect records. I still believe there is something wrong anyway. You should see at least 1000 record aggregated with that configuration

@omarsmak
Copy link
Member

@nidhijwt to narrow down on the problem, can you please try to use a logger sink instead of the azure sink (or any other sink that could help you to troubleshoot the issue further) to see if the data indeed are aggregated?

@oscerd
Copy link
Contributor

oscerd commented Jul 23, 2020

ah, the option you're using for camel.beans.aggregate is wrong. It should be
"camel.beans.aggregate": "#class:org.apache.camel.kafkaconnector.aggregator.StringAggregator"

@nidhijwt
Copy link
Author

Yes, Aggregation worked. Thanks

When is the next release planned?

@oscerd
Copy link
Contributor

oscerd commented Jul 27, 2020

We'll start the release cut this week.

@nidhijwt
Copy link
Author

Ok, thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants