Skip to content

Conversation

wenxuanguan
Copy link
Contributor

What changes were proposed in this pull request?

Implement Kafka sink exactly-once semantics with transaction Kafka producer.

Why are the changes needed?

Does this PR introduce any user-facing change?

No

How was this patch tested?

  1. Added UT
  • write to transaction kafka
  • restart from failed send, resend data
  • recover from failed commit, resume producer and commit
  • recover from other task failed commit, resume producer and recommit
  1. Failover Test
situation result
task retry when sending data to Kafka no data loss
application failed after successfully sending data to Kafka no data loss when job attempt or application restart
job failed after store producer meta-info to HDFS no data loss when job attempt or application restart
task retry when commit transaction job failed and no data loss when job attempt or application restart and resume transaction
executor crash down job failed and no data loss when job attempt or application restart

@gaborgsomogyi
Copy link
Contributor

By reading the doc without super deep understanding I've found this in the caveats section:

If job failed before ResumeTransaction more than 60 seconds, the default value
ofconfiguration transaction.timeout.ms, data send to Kafka cluster will be discarded
and lead todata loss.So We set transaction.timeout.ms to 900000, the default
value of max.transaction.timeout.msin Kafka cluster, to reduce the risk of data loss
if user not defined

The to reduce the risk of data loss part disturbs me a bit, is it exactly-once then or not?
@HeartSaVioR AFAIK you've proposed exactly once SPIP before but there were concerns.

@HeartSaVioR
Copy link
Contributor

Before reviewing the design, I may need to say, you are encouraged to at least mention it if you borrow the code from somewhere, so that we are sure that there's no license issue, even no license issue, at least they've got credit.

https://github.com/apache/spark/pull/25618/files#diff-c1e1dbc4a986c69ef54e1eebe880d4e9
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Aug 29, 2019

Just skimmed the design doc (need to take a look deeply on fault tolerance) and it's basically known approach what Flink is doing (2PC). Please mention what you've inspired of, for same reason, credit.

I planned to propose similar before (in last year, haven't proposed the design itself), more clearly I've asked to support 2PC in DSv2 API level as Spark doesn't support 2PC natively, but feedback wasn't positive as it should be very invasive change on Spark codebase. There has been more cases asking for exactly-once write, and I guess the common answer was leveraging intermediate output. While some storage can leverage it (e.g. RDBMS - writers write to temp table, driver copies rows that writers reported to output table), it doesn't make sense for Kafka, at least for performance reason, as there's no way to let Kafka copies its records from topic A to topic B (right?), so I gave up.

If the code change implements 2PC correctly, in general I guess it would work in many cases, though as it's explained that transaction timeout leads data loss. I've indicated the issue on transaction timeout when I designed it and that was also one of major concerns as well. When the producer writes something it must be committed within timeout in any kinds of failures, otherwise "data loss" happen. Even we decide to invalidate that batch and rerun the batch, we're now then "at-least-once" as some partitions already committed successfully. (I'm wondering Flink's Kafka producer with 2PC also has similar issue or they have some safeguard.)

extends KafkaRowWriter(inputSchema, targetTopic) with DataWriter[InternalRow] {

private lazy val producer = {
val kafkaProducer = CachedKafkaProducer.getOrCreate(producerParams)
Copy link
Contributor

@HeartSaVioR HeartSaVioR Aug 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark leverages the fact that Kafka producer is thread-safe. You may need to check whether it's still valid for transactional producer as well. (My instinct says it may not, otherwise you'll deal with 2PC via multiple partitions with same producer id in same executor. Sounds weird.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have considered kafka producer per executor. But there will be data loss to abort transaction when multiple task share one transaction, and some task failed and retry in other executor.
So to avoid create too many producer, task will reuse the created producer. And the config producer.create.factor will limit producer total number in abnormal scene, such as long tail task.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I meant is Spark shares Kafka producer in multi-threads in executor once producerParams is same. So what you considered is exactly what Spark is doing now. (I meant to point out this.) According to your explanation, caching logic should be changed to restrict multi-threads usage.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think caching logic is ok and we can control producer creation per task, and also failover with transactional.id in producerParams.
Transaction producer is not thread safe, so what I do is one producer per task in one micro-batch, and in next batch reused the created producer instead of recreate one since transaction is complete in every micro-batch. With producerParams, transactional.id is different between tasks in one micro-batch, but same in the next micro-batch.
And if task number is same for every executor in every micro-batch, no more producer will be created except the first micro-batch.

@wenxuanguan
Copy link
Contributor Author

By reading the doc without super deep understanding I've found this in the caveats section:

If job failed before ResumeTransaction more than 60 seconds, the default value
ofconfiguration transaction.timeout.ms, data send to Kafka cluster will be discarded
and lead todata loss.So We set transaction.timeout.ms to 900000, the default
value of max.transaction.timeout.msin Kafka cluster, to reduce the risk of data loss
if user not defined

The to reduce the risk of data loss part disturbs me a bit, is it exactly-once then or not?
@HeartSaVioR AFAIK you've proposed exactly once SPIP before but there were concerns.

@gaborgsomogyi @HeartSaVioR Thanks for your reply about the config transaction.timeout.ms and the data loss.
The common scene occurred is that as producer failed to commit transaction for some reason, such as kafka broker down, spark job will fail down. After kafka broker recovered, restart the job and transaction will resume. So if the time between transaction commit failure fixed and job restart by job attempt or manually not exceed transaction.timeout.ms, no data will be lost.
The default config transaction.timeout.ms in producer 60 seconds, so to make sure there is enough time for fix failure we reset it to 900000, the default value of kafka broker config, if user not defined. Because the request will fail if the producer config transaction.timeout.ms' is larger than the kafka broker config. I think it is what we can do in code, and also notice user in document. There is also some solution to avoid this, such as increase config transaction.timeout.ms, and it is depend on user. So if user defined transaction.timeout.ms`, we just check if it is larger enough.

@wenxuanguan
Copy link
Contributor Author

wenxuanguan commented Aug 30, 2019

Before reviewing the design, I may need to say, you are encouraged to at least mention it if you borrow the code from somewhere, so that we are sure that there's no license issue, even no license issue, at least they've got credit.

https://github.com/apache/spark/pull/25618/files#diff-c1e1dbc4a986c69ef54e1eebe880d4e9
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java

@HeartSaVioR Thanks for your advice.
Indeed I refer to the flink FlinkKafkaProducer.java to resume transaction since there is no kafka API to support this function. And I will add it to annotation and design sketch.
About the design of 2PC, it is a common way to implement EOS and I think spark hdfs sink is also apply with it.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Aug 30, 2019

Spark doesn't have semantics of 2PC natively as you've seen DSv2 API - If I understand correctly, Spark HDFS sink doesn't leverage 2PC.

Previously it used temporal directory - let all tasks write to that directory, and driver move that directory to final destination only when all tasks succeed to write. It leverages the fact that "rename" is atomic, so it didn't support "exactly-once" if underlying filesystem doesn't support atomic renaming.

Now it leverages metadata - let all tasks write files, and pass the list of files (path) written to driver. When driver receives all list of written files from all tasks, driver writes overall list of files to metadata. So exactly-once for HDFS is only guaranteed when "Spark" reads the output which is aware of metadata information.

@wenxuanguan
Copy link
Contributor Author

wenxuanguan commented Aug 30, 2019

Spark doesn't have semantics of 2PC natively as you've seen DSv2 API - If I understand correctly, Spark HDFS sink doesn't leverage 2PC.

Previously it used temporal directory - let all tasks write to that directory, and driver move that directory to final destination only when all tasks succeed to write. It leverages the fact that "rename" is atomic, so it didn't support "exactly-once" if underlying filesystem doesn't support atomic renaming.

Now it leverages metadata - let all tasks write files, and pass the list of files (path) written to driver. When driver receives all list of written files from all tasks, driver writes overall list of files to metadata. So exactly-once for HDFS is only guaranteed when "Spark" reads the output which is aware of metadata information.

Sorry for late reply.
In my understand that is the procedure of 2PC. Please correct me if I'm missing something.
The voting phase every task write data and return commit message to driver. In the commit phase, if all tasks completed successfully, the driver commit job with rename, or abort job if any task failed to commit or job commit failed.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Aug 30, 2019

Well, someone could say it as 2PC since the behavior is similar, but generally 2PC assumes coordinator and participants. In second phase, coordinator "ask" for commit/abort to participants, not committing/aborting things directly participants just did in first phase. Based on that, driver should request tasks to commit their outputs, but Spark doesn't provide such flow. So that's pretty simplified version of 2PC and also pretty limited.

I think the point is whether we are feeling OK to have exactly-once with some restrictions end users need to be aware of. Could you please initiate discussion on this in Spark dev mailing list? That would be good to hear others' voices.

@gaborgsomogyi
Copy link
Contributor

+1 having discussion on that. My perspective is clear. Having such limitation is a bit too much in such scenario so I'm not feeling comfortable with it.

@gaborgsomogyi
Copy link
Contributor

In the meantime I'm speaking with Gyula from Flink side to understand things deeper...

@wenxuanguan
Copy link
Contributor Author

@HeartSaVioR @gaborgsomogyi Thanks for your advice. I have created discussion in mail-list and looking forward to you guys.

@gaborgsomogyi
Copy link
Contributor

So we've sit together with the Flink guys and had a deeper look at this area.

Basically they have similar solution. Data is sent to Kafka with flush, then the producer ID and the transaction ID is stored in the checkpoint (producer ID is needed to have exactly one producer, transaction ID is needed to know what to commit/rollback). If any problem arise Flink can recover from there and can retry to commit the transaction. The whole flow makes the assumption that if that data was able to get generated and sent to Kafka then it's only a matter of time to commit it. In order to reach that the transaction timeout is increased significantly enough to cover a possible broker restart or so.

I have mainly 2 concerns with this PR:

  • as @HeartSaVioR mentioned the actual PR is a lightweight version of 2PC (please see details above)
  • even if we say there is this contract between Spark and Kafka: it's only matter of time to commit data on the other side it may loose data with transaction timeout (Flink suffer from this also). I would not be so happy from user perspective if Spark would say: We've lost some data but it's not Spark's fault but Kafka didn't do the commit in time. I think it would be a good feature for both Spark and Flink if Kafka can somehow turn off transaction timeout.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Sep 9, 2019

I'm not expert of Kafka (specifically how transaction works in Kafka), but given Kafka still writes "sequentially" to the topic and needs to provide records "in order", I don't think Kafka will allow multiple transactions can write to the topic concurrently. (Say, I expect "topic lock", in RDBMS term) If I assume correctly, turning off transaction timeout may lead the topic be not writable forever - once you lose producer/transaction id and unable to restore transaction. I feel no timeout seems unrealistic - that's the thing we should live with in distributed system.

So there're not many options but known 2 options: 1) "data loss" after transaction timeout 2) turned to "at-least-once" after transaction timeout. Even it assumes 2PC logic is properly coupled with checkpoint of SS considering fault tolerance. Majority of other options would give up parallelism and let one writer writes all outputs, so I can't imagine better option for now.

@gaborgsomogyi
Copy link
Contributor

I would like to hear Kafka guys opinion before we say these are the only options.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Sep 10, 2019

You might want to know that Kafka transactional producer is designed for Kafka stream (explaining below), so I'm wondering they've been considered other cases.

https://www.confluent.io/blog/enabling-exactly-once-kafka-streams/

This article clearly describes the fact, search the sentence As a result, we can translate all the above three steps into a number of records sent to different topics: and look at the pic below.

The streaming frameworks have to guarantee transactional write among 1. writing outputs 2. storing states 3. storing offset/commit (checkpoint) to properly support "exactly-once". As the article describes, Kafka stream is easier to achieve above as for Kafka stream 1~3 are all Kafka topics and Kafka producer guarantees atomic write to multiple Kafka topics. (Assuming reader is reading these topics as "read committed".)

For other frameworks, there's no such guarantee and frameworks should provide some mechanism to guarantee it, or give up end-to-end exactly-once for some points.

SS provides mechanism for 2 and 3 to ensure stateful exactly-once, and also 1 only if driver can transactionally commit the outputs from tasks. (The metadata in FileStreamSink is one of cases - metadata is leveraged to read outputs as "read committed".) Loosen contract for 1 is "idempotent write" which behaves similar as exactly-once given replaying must happen, though the output is not transactional. Kafka transactional producer is not the case so it requires Spark itself to care about the difference, or have limitations somehow.

@gaborgsomogyi
Copy link
Contributor

Had a small chat with @viktorsomogyi from Kafka team and mentioned that Flink and maybe Hive is dealing with this issue. There are some ugly hacks around but since at least 3 tech areas are struggling with this it would be good to create a real solution with API.

@wenxuanguan
Copy link
Contributor Author

wenxuanguan commented Sep 12, 2019

@gaborgsomogyi @HeartSaVioR Thanks for your reply.
About transaction timeout, as described above and in the mail list, I think at present the only way to address the issue is:

  1. lost data when transaction timeout.
  2. resend data when we find transaction timeout.
  3. set transactional.timeout.ms to a larger value, see Integer.MAX_VALUE.

The first way is rejected since it leads to data loss. The second one is rejected since it is not consistent with exactly-once semantics.
The last one seems workable. Because commit transaction is lightweight action, If transaction is not recovered after Integer.MAX_VALUE ms, we can say it is caused by Kafka cluster fatal error, and transaction can`t be recovered. At this situation, we can provider a way to resend data if user want to.

@gaborgsomogyi About a new Kafka API to resolve Kafka transaction in distributed system, as @HeartSaVioR mentioned above, Kafka producer transaction is not provided only for Kafka Stream, and a new API for Spark/Flink/HIve may be customized. So I also think we should adapt Spark/Flink/Hive to it.

@gaborgsomogyi
Copy link
Contributor

I would add this feature to Spark when no such hacks are needed like this.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Sep 14, 2019

About a new Kafka API to resolve Kafka transaction in distributed system, as @HeartSaVioR mentioned above, Kafka producer transaction is not provided only for Kafka Stream, and a new API for Spark/Flink/HIve may be customized. So I also think we should adapt Spark/Flink/Hive to it.

Sorry you are understanding my comment in opposite way. My claim was that Kafka producer transaction is designed "for" Kafka Stream. Please take a look at my comment thoughtfully.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-129%3A+Streams+Exactly-Once+Semantics

According to design doc, Kafka community took the approach "transaction per task":

In this design we take the approach to assign a separate producer per task so that any transaction contains only output messages of a single task.

which they never need to worry about transaction across multiple connections/JVMs - unlike other streaming frameworks. According to the information I guess Kafka stream should leverage Kafka topic as shuffle storage and have multiple connected read-process-write topologies to run user application. (So ensuring exactly-once for all of connected parts brings exactly-once for overall graph.) That's completely coupled with Kafka and Spark can't (and shouldn't) do the same.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while.
This isn't a judgement on the merit of the PR in any way. It's just
a way of keeping the PR queue manageable.

If you'd like to revive this PR, please reopen it!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants