Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support producer only Kafka transactions #1075

Open
seglo opened this issue Mar 5, 2020 · 2 comments
Open

Support producer only Kafka transactions #1075

seglo opened this issue Mar 5, 2020 · 2 comments

Comments

@seglo
Copy link
Member

seglo commented Mar 5, 2020

Short description

Support producer only Kafka transactions in Transactional.sink|flow.

Details

Alpakka Kafka currently only supports a consume, transform, produce workflow for Kafka Transactions. In some cases it may be useful to only enable transactions for messages produced to Kafka, for example, when the source data is not currently in Kafka. This would have less guarantees than the standard consume, transform, produce workflow because the user would need to take care not to send duplicate messages between restarts of the workload, but it would be useful to ensure that all messages within a transaction are committed or rolled back. This could be useful when using MultiMessage to ensure that all or none of the messages are produced.

One drawback could be that users might want more control over exactly when transactions are committed. Currently we only support transactions of fixed time intervals, similar to Kafka Streams. I don't recommend we give the user more control over when commits occur, but we can make sure that use cases like having all messages from a MultiMessage in a single transaction are supported.

References

@mdedetrich
Copy link
Contributor

mdedetrich commented Feb 10, 2022

This is also something that we are missing, currently writing a backup tool with Kafka and for the restore portion (where we restore from a backup to a kafka cluster) and ideally we would like to use the Alpakka Transactional API to stream messages out of the backup source to guarantee a exactly once restore.

As a workaround currently using

def baseProducerConfig
    : Some[ProducerSettings[Array[Byte], Array[Byte]] => ProducerSettings[Array[Byte], Array[Byte]]] =
  Some(
    _.withBootstrapServers(
      container.bootstrapServers
    ).withProperties(
      Map(
        ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG             -> true.toString,
        ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION -> 1.toString,
        ProducerConfig.BATCH_SIZE_CONFIG                     -> 0.toString
      )
    ).withParallelism(1)
  )

but this is less than ideal

@wim82
Copy link

wim82 commented Nov 8, 2022

We have the same use case.. but also fairly sure im not capable of adding support for this :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants