Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[question] Even if I recreate the kafka topic or modify the topic properties, I wonder how consuming can continue to do it. #7100

Closed
minwoo-jung opened this issue Jun 29, 2021 · 8 comments

Comments

@minwoo-jung
Copy link

minwoo-jung commented Jun 29, 2021

Hello~
I am using pinot well. Thank you for making a great product. :)
I have a question because it didn't work as I thought while using it.

Even if I recreate the kafka topic or modify the topic properties, I wonder how consuming can continue to do it.

I am storing data in Stream ingestion way.
for some reason I need to delete the kafka topic and recreate it. The consuming segment seems to have stopped.
That is, no more data is stored.
I've tested it several times with the same scenario, but the consuming segment still stops and no data is saved.

So, I tried several methods to solve this problem, and among various attempts,
1 When I disable the data table
2 delete and recreate the kafka topic
3 enable the data table
sometimes the consuming segment recovers its operation.
However, it does not always work normally.

Also, I tried reload segment after comsuming segment stopped, but it still didn't work.
In addition, I tried various methods, but consuming stopped as it is.
Also, I restarted the docker cluster several times, but the consuming segment still did not work.
I tried various methods besides these, but couldn't find a solution.

My guess is that when the kafka topic is recreated, the data offset is changed and this is what happens.
In my opinion, if the offset is well reset within the pinot consumer, even if the kafka topic is recreated, it is normal when the comsuming segment continues to accumulate data well.
Even if I recreate the kafka topic or modify the properties, I wonder how consuming can continue to do it.
I don't know the internal logic well, but I looked at the org.apache.pinot.core.realtime.impl.kafka2.KafkaConsumerFactory, KafkaPartitionLevelConsumer, KafkaStreamLevelConsumer class codes, but couldn't find any problem.

If the consuming segment stops, re-creating the table may be a way,
but since the previously stored data is lost, I am looking for a way to keep the comsuming segment operating normally and not lose data without re-creating the table.

Note that
The docker image version currently used is as follows.

{
  "pinot-protobuf": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
  "pinot-kafka-2.0": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
  "pinot-avro": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
  "pinot-distribution": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
  "pinot-csv": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
  "pinot-s3": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
  "pinot-yammer": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
  "pinot-segment-uploader-default": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
  "pinot-batch-ingestion-standalone": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
  "pinot-confluent-avro": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
  "pinot-thrift": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
  "pinot-orc": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
  "pinot-batch-ingestion-spark": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
  "pinot-azure": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
  "pinot-gcs": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
  "pinot-batch-ingestion-hadoop": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
  "pinot-hdfs": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
  "pinot-adls": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
  "pinot-kinesis": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
  "pinot-json": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
  "pinot-minion-builtin-tasks": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
  "pinot-parquet": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
  "pinot-segment-writer-file-based": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb"
}

table confg

{
  "REALTIME": {
    "tableName": "systemMetricLong_REALTIME",
    "tableType": "REALTIME",
    "segmentsConfig": {
      "timeType": "MILLISECONDS",
      "schemaName": "systemMetricLong",
      "retentionTimeUnit": "DAYS",
      "retentionTimeValue": "2",
      "timeColumnName": "timestampInEpoch",
      "replicasPerPartition": "1"
    },
    "tenants": {
      "broker": "DefaultTenant",
      "server": "DefaultTenant"
    },
    "tableIndexConfig": {
      "loadMode": "MMAP",
      "sortedColumn": [
        "applicationName"
      ],
      "autoGeneratedInvertedIndex": false,
      "createInvertedIndexDuringSegmentGeneration": false,
      "streamConfigs": {
        "streamType": "kafka",
        "stream.kafka.consumer.type": "lowlevel",
        "stream.kafka.topic.name": "system-metric-long",
        "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
        "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
        "stream.kafka.broker.list": XXXXXXX
        "realtime.segment.flush.threshold.raws": "0",
        "realtime.segment.flush.threshold.time": "24h",
        "realtime.segment.flush.threshold.segment.size": "50M",
        "stream.kafka.consumer.prop.auto.offset.reset": "smallest"
      },
      "invertedIndexColumns": [
        "tags"
      ],
      "rangeIndexColumns": [
        "timestampInEpoch"
      ],
      "aggregateMetrics": false,
      "nullHandlingEnabled": true,
      "enableDefaultStarTree": false,
      "enableDynamicStarTreeCreation": false
    },
    "metadata": {
      "customConfigs": {}
    },
    "isDimTable": false
  }
}

I know you are busy developing, but I hope you can help. I've been looking for a solution for a week, but I can't find a way.

@minwoo-jung minwoo-jung changed the title [question] **Even if I recreate the kafka topic or modify the topic properties, I wonder how consuming can continue to do it.** [question] Even if I recreate the kafka topic or modify the topic properties, I wonder how consuming can continue to do it. Jun 29, 2021
@minwoo-jung
Copy link
Author

It doesn't matter how much data is lost. Only comsuming should be restored and operated normally.:)

@mcvsubbu
Copy link
Contributor

If you don't care about data loss, then you can do the following:

  • Drop your table
  • Recreate the table with the correct topic.

@minwoo-jung
Copy link
Author

minwoo-jung commented Jun 29, 2021

@mcvsubbu
First of all, thanks for the quick reply. :)
This means that the data in the previously saved table is maintained, and Messages in Kafka can be lost.
I would like to keep the data previously stored in the table.

is there any workaround?:)

@minwoo-jung
Copy link
Author

minwoo-jung commented Jun 29, 2021

@mcvsubbu
Is there any way(workaround) to lose some data in the table and recover consuming without recreating the table?
For example, Data stored in the consuming segment is lost, but how to initialize the cosuming segment.

@mcvsubbu
Copy link
Contributor

As of now, there is no workaround.

If you think about it, Pinot consumes from Kafka using offsets. Kafka guarantees a unique offset for each message in the queue, and a unique queue per topic. Across topics, the offsets and queues will not match in general, so the semantics of an identifier of a 'row' is lost.

@minwoo-jung
Copy link
Author

minwoo-jung commented Jun 30, 2021

@mcvsubbu
Thanks for the reply.
I understand what you are saying and I know that solving problems is not easy.
I'd like to ask you a few more questions.

1

Is there any development in progress to solve this problem?
When the above situation occurs, this issue is important to me because I want to keep the existing data and minimize the modification of the business code.

2

If I follow the method of creating a new table, however, existing data must be preserved, so I create a new table and insert data.
Assume that the data of the existing table(table A) must be preserved and used continuously, and that the newly added table(Table B) is created with a different name and starts to store data.
In the business code, I will have to modify the code to access two tables (Table_A, Table_B).

For example, To continue using the data in the tables,
the application code should be changed as shown below.

if (searchtime < createTableBtime)
    select table A
else (searchtime > createTableBtime)
    select table B

That is, what I want to ask is...
Instead of modifying the code every time to access the data as above whenever the table name or number is changed,
Even if it takes time, is there a way to push the data from the existing table_A to the newly added table_B?
If this is possible, it is likely that the amount of code modifications and conditional statements will be significantly reduced.

If so, it would be good if you could give me some advice.:)
I've looked for a possible way in the manual, but I haven't found it yet.

3

I've seen this issue. #6555
I couldn't understand it exactly, but I assumed that I gave up the integrity of the data (either duplicate rows, or miss rows altogether).
If we can solve the table recreate issue by supporting the earliest/latest offset, This would be a good workaround for us.

For reference,
the offset value has been changed from smallest/largest to latest/earliest, and none from kafka 2.0 or later. The version may not be correct.
However, it is confirmed that only smallest and largest are supported in KafkaStreamMetadataProvider class.
https://github.com/apache/incubator-pinot/blob/47a75e5093129cc280de4c118434ccb337cd3da1/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java#L55

4

We have trouble because we want to permanently store customer data(up to 6 months to 1 year) in the pinot, rather than storing, verifying, and erasing user data one-time.
Is it not recommended for the case where pinot continuously stores user data as streaming and maintains user data? I want to keep data like RDB used in normal web development. However, it doesn't matter whether there is TTL or expire time.
Does Pino only recommend storing and analyzing data on a one-time basis? Wouldn't it be recommended to continuously store and analyze data for a long period of time (up to 6 months to 1 year) by streaming?
Does pinot recommend storing the user's raw data directly in the pinot by the stream ingestion method and storing the data for a long time?

We know you are busy, but we look forward to answering your questions.
Thank you. Have a nice day.

@minwoo-jung
Copy link
Author

minwoo-jung commented Jul 5, 2021

Hi @@mcvsubbu

I am writing to get answers to the above questions.
You're so busy, but can you answer me when you have time?

I am a developer developing an open source pinpoint apm.
We are developing to collect system metric data, store and analyze raw data in pinot to show meaningful data to users.
We reviewed and looked at pinot for a long time, so we decided that it could be a good repository, and we are developing a metric collection function.

If you give answers to the above questions, I think we can make good functions using pinot.
So, have a nice day and thank you:)

@sajjad-moradi
Copy link
Contributor

There are two types of properties when it comes to changing the stream configs:

  1. Changes that modify the underlying stream like topic name or cluster name change.
  2. Stream compatible changes that don't modify the underlying stream like segment.threshold parameters

The problem with first type is that offsets of different partitions change completely when the underlying stream changes. Pause/resume feature - that recently merged into master (#8986 and #9289) - can help here. For these incompatible parameter changes, the resume request has an option to handle the case of a completely new set of offsets. Operators can now follow a three-step process: First, issue a Pause request. Second, change the consumption parameters. Finally, issue the Resume request with the appropriate option. These steps will preserve the old data and allow the new data to be consumed immediately. All through the operation, queries will continue to be served.

For the 2nd type, force commit endpoint #9197 can be used. The current consuming segments which hold the previous values in stream config will be immediately completed and new consuming segment will be spun off. These new consuming segments will pick up the new values in the stream config.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants