Skip to content

Move CheckpointOffsetManager logic to Reader and Writer #130

@kevinwallimann

Description

@kevinwallimann

The StreamManager has two methods with signatures

  def configure(streamReader: DataStreamReader, configuration: Configuration): DataStreamReader

  def configure(streamWriter: DataStreamWriter[Row], configuration: Configuration): DataStreamWriter[Row]

It's hard to imagine any other use case than the checkpoint location which would justify having the concept of a stream manager which configures solely reader and writer (but not the transformer for example). In fact, the implementation of the CheckpointOffsetManager can hardly be reused for a different data source since the concept of starting offsets is tightly coupled to kafka. Moreover, for the reader config, the checkpoint location is only needed to determine the starting offsets, which is an implementation detail. Arguably, it may be surprising to the developer that the starting offsets are not set in the KafkaStreamReader but in the CheckpointOffsetManager
For all these reasons, I believe the extra indirection from having a StreamManager concept is not justified.

manager.checkpoint.base.location can be replaced by reader.kafka.checkpoint.base.location and writer.kafka.base.location and writer.parquet.base.location Another possibility is to replace it by a "top-level" property spark.ingestor.checkpoint.base.location since the checkpoint location is mandatory for every structured streaming query (see https://github.com/apache/spark/blob/695cb617d42507eded9c7e50bc7cd5333bbe6f83/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L259)

Decision
Replace manager.checkpoint.base.location by writer.common.checkpoint.base.location because the checkpoint location is a config property for the DataStreamWriter per the Spark documentation. It's least surprising to add it as a writer property here as well. Unfortunately, the reader will have to depend on this property as well.
Also, rename checkpoint.base.location to checkpoint.location, since the concept of a base location has been given up in #85

Migration for Trigger
Replace

manager.checkpoint.base.location

with

writer.common.checkpoint.location

Replace

"component.manager=za.co.absa.hyperdrive.ingestor.implementation.manager.checkpoint.CheckpointOffsetManager",

with
(empty string)

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions