Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-29223][SQL][SS] Enable global timestamp per topic while specifying offset by timestamp in Kafka source #25911

Closed
wants to merge 2 commits into from

Conversation

HeartSaVioR
Copy link
Contributor

What changes were proposed in this pull request?

This patch is a follow-up of SPARK-26848 (#23747). In SPARK-26848, we decided to open possibility to let end users set individual timestamp per partition. But in many cases, specifying timestamp represents the intention that we would want to go back to specific timestamp and reprocess records, which should be applied to all topics and partitions.

This patch proposes to provide a way to set a global timestamp across partitions for a topic, so that end users can set all offsets by specific timestamp easily.

The patch doesn't provide a way to set global timestamp across topics, as it would require modification of format of startingOffsetsByTimestamp/endingOffsetsByTimestamp, which may not be intuitive to understand.

Why are the changes needed?

This would helps end users to set timestamp for reprocessing from specific point of time much easier. It also remedies the requirement of knowing number of partitions to set offsets.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Added UTs to verify the new feature.

</td>
<td>none (the value of <code>startingOffsets<code/> will apply)</td>
<td>none (the value of <code>startingOffsets</code> will apply)</td>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just fixed a nit which IDEA warns about.

</td>
<td>latest</td>
<td>the value of <code>endingOffsets</code> will apply</td>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually same meaning as NOTE 2 but in case of misunderstanding I just mentioned this again here. Please let me know if it is just OK to be latest.

<td>batch query</td>
<td>The end point when a batch query is ended, a json string specifying an ending timesamp for each TopicPartition.
<td>The end point when a batch query is ended, a json string specifying an ending timestamp for each TopicPartition.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just fixed a nit.

// no real way to check that beginning or end is reasonable
}
}
private def adjustParamsWithPartitionsForOffsets
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE: method values are extracted to add more code in its method definition - it becomes too long so I didn't feel good to keep them all in a method. I just left some of short method values as they were.

sendMessages(topic, Array(2).map(_.toString), 2, secondTimestamp)
sendMessages(topic, Array(12).map(_.toString), 3, secondTimestamp)
// no data after second timestamp for partition 4
setupTestMessagesForTestOnTimestampOffsets(topic, firstTimestamp, secondTimestamp)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lines with sendMessages are moved to setupTestMessagesForTestOnTimestampOffsets to reuse between tests for timestamps and tests for global timestamp.

.as[(String, String)]
val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)

val mapped = setupDataFrameForTestOnTimestampOffsets(startingTimestamps, failOnDataLoss,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lines with setting up DataFrame are moved to setupDataFrameForTestOnTimestampOffsets to reuse between tests for timestamps and tests for global timestamp.

@@ -268,6 +268,24 @@ abstract class KafkaRelationSuiteBase extends QueryTest with SharedSparkSession
}, topic, 0 to 19)
}

test("global timestamp provided for starting and ending") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same test as timestamp provided for ending, offset provided for starting but using global timestamp instead.

"Cannot find expected AssertionError in chained exceptions")
}

test("specifying both global timestamp and specific timestamp for partition") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test verifies the new configuration error.

@SparkQA
Copy link

SparkQA commented Sep 24, 2019

Test build #111266 has finished for PR 25911 at commit a1e98ab.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 24, 2019

Test build #111269 has finished for PR 25911 at commit 7045413.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

cc. @jose-torres @koeninger @gaborgsomogyi as active reviewers of #23747


val newParams: Map[TopicPartition, Long] = paramsGroupedByTopic.map {
case (topic, tpToOffset) =>
if (tpToOffset.keySet.map(_.partition()).contains(GLOBAL_PARTITION_NUM)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE: This if statement is the only effective change. Others are mostly refactoring.

@gaborgsomogyi
Copy link
Contributor

gaborgsomogyi commented Oct 1, 2019

Do I understand correctly the change is all about instead writing this:
{"topicA":{"1234": 1000}, "topicB": {"1234": 2000}}
with the PR one can write this?
{"topicA":{"-1": 1000}, "topicB": {"-1": 2000}}

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Oct 1, 2019

Yes, but you seem to provide an example of "as-is" too simplified. If we have 5 partitions for topicA, it would be:

{"topicA":{"0": 1000, "1": 1000, "2": 1000, "3": 1000, "4": 1000}}

and let's say topicA has 100 partitions... no, never mind. :)
(I agree no one would deal with this by hand if number of partitions goes really big, but it also means we should play with json handling programatically.)

It also brings another advantage: we no longer need to know about number of partitions per topic in prior.

@gaborgsomogyi
Copy link
Contributor

What I've seen until now such cases where partition number is huge the list is generated with code.

Where there may be potential is the second use-case what you've mentioned. A common pattern in the Kafka area to over-allocate the number of partitions initially (since not easy to scale Kafka when huge data is handled). Such case maybe 1000 partitions created initially but only 200 used. When data volume increase additional sleeping partitions can be added without doing a heavy re-partitioning. In this quite common use-case I don't see it could help.
What I can imagine is for debugging purposes where not much things scripted and one doesn't care if the topic re-created with different number of partitions. Adding ~200 lines for this reason is questionable.

@HeartSaVioR
Copy link
Contributor Author

I would say it would help in any case including partition number is small.

Kafka data source is not used only for streaming application but also used for batch query, including ad-hoc query. For ad-hoc query, the requirement to know number of partitions is a real burden given not only data engineers run the query, but also data scientists run the query. Some of them even may not (want to) know about number of partitions of topic. (We may need to think out of engineers' perspective.)

I wouldn't concern about usability of start/end offsets as I guess the feature wouldn't be used so much. (Who would want to memorize/calculate offset per partition and replay from there? It should be only used for replaying from specific situation, query crashed and unfortunately checkpoint lost.)

The feature regarding offsets by timestamp is not. It enables end users to run a batch query to query the Kafka topic by range of timestamp, which is the real case they just want to forget about partitions (say, abstracted away). To support this we are introducing less than 50 lines of complexity (except refactoring - I'm counting only source side, not test side) which doesn't seem to be matter. (My 2 cents, we should concern about addition of complexity, not number of new lines.)

Btw, IMHO, #23749 is the ideal approach on dealing with this case (I think it's still valid), though seems like community wanted to deal with such case as "source option". This patch may lose the one of major use case if #23749 is adopted then. (though I still think it also helps to streaming queries.)

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Jan 10, 2020
@HeartSaVioR
Copy link
Contributor Author

IMHO this patch or #23749 (ideal one) should be helpful to run interactive query against Kafka source; the needs on such use case is being proven by KSQL, but someone may not want to stick with the vendor lock-in solution, and Spark can still help them if we can manage to reduce down the complexity of usage.

No one will want to provide partitions at all per interactive query. They don't want to code, they want to query. That's why I say #23749 is ideal one, but if we think #23749 is adding unacceptable complexity, this patch should be the one of alternative, less better than that though.

@HeartSaVioR
Copy link
Contributor Author

I see actual customer's demand on this; "a" topic has 100+ partitions and it's weird to let them craft json which contains 100+ partitions for the same timestamp.

Flink already does the thing; Flink uses global value across partitions for earliest/latest/timestamp, while it allows to set exact offset per partition.

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-consumers-start-position-configuration

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(...);
myConsumer.setStartFromEarliest();     // start from the earliest record possible
myConsumer.setStartFromLatest();       // start from the latest record
myConsumer.setStartFromTimestamp(...); // start from specified epoch timestamp (milliseconds)
myConsumer.setStartFromGroupOffsets(); // the default behaviour
Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);

myConsumer.setStartFromSpecificOffsets(specificStartOffsets);

Given this PR is stale, I'll rebase this with master and raise the PR again.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants