Skip to content

[Bug] Unable to set sink-committer resources in batch jobs #6565

@mao-liu

Description

@mao-liu

Search before asking

  • I searched in the issues and found nothing similar.

Paimon version

Paimon 1.2

Compute Engine

Flink 1.20

Minimal reproduce step

package <PACKAGE>

import io.kotest.core.spec.style.StringSpec
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.data.GenericRowData
import org.apache.flink.table.data.RowData
import org.apache.flink.table.data.StringData
import org.apache.flink.test.util.MiniClusterWithClientResource
import org.apache.paimon.flink.FlinkConnectorOptions
import org.apache.paimon.flink.sink.FlinkSinkBuilder
import org.apache.paimon.fs.local.LocalFileIO
import org.apache.paimon.schema.Schema
import org.apache.paimon.schema.SchemaManager
import org.apache.paimon.table.CatalogEnvironment
import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.table.FileStoreTableFactory
import org.apache.paimon.types.DataTypes
import java.io.File
import org.apache.paimon.fs.Path as PaimonPath
import org.apache.paimon.options.Options as PaimonOptions

class PaimonCommitterResourcesTest : StringSpec() {

    private val basePath = "/tmp/paimon_sink_commiter_resources_test"
    private val paimonOptions =
        PaimonOptions(
            mapOf(
                FlinkConnectorOptions.SINK_COMMITTER_CPU.key() to "2",
                FlinkConnectorOptions.SINK_COMMITTER_MEMORY.key() to "2000 mb",
            ),
        )

    private val miniFlinkCluster =
        MiniClusterWithClientResource(
            MiniClusterResourceConfiguration
                .Builder()
                .setNumberSlotsPerTaskManager(1)
                .setNumberTaskManagers(2)
                .build(),
        )

    init {
        beforeSpec {
            miniFlinkCluster.before()
            File(basePath).deleteRecursively()
        }

        afterSpec {
            miniFlinkCluster.after()
            File(basePath).deleteRecursively()
        }

        "streaming mode sets global committer resources" {
            val tablePath = "$basePath/test_streaming"

            val table = createTestPaimonTable(tablePath)
            val env =
                StreamExecutionEnvironment
                    .getExecutionEnvironment()
                    .setParallelism(2)
                    .setRuntimeMode(RuntimeExecutionMode.STREAMING)

            writeRecords(env, table)
        }

        "batch mode sets global committer resources" {
            val tablePath = "$basePath/test_batch"

            val table = createTestPaimonTable(tablePath)

            val batchConfiguration =
                Configuration.fromMap(
                    mapOf(
                        "fine-grained.shuffle-mode.all-blocking" to "true",
                    ),
                )

            val env =
                StreamExecutionEnvironment
                    .getExecutionEnvironment(batchConfiguration)
                    .setParallelism(2)
                    .setRuntimeMode(RuntimeExecutionMode.BATCH)

            writeRecords(env, table)
        }
    }

    private fun createTestPaimonTable(tablePath: String): FileStoreTable {
        val paimonPath = PaimonPath(tablePath)
        val fileIO = LocalFileIO.create()

        val schemaBuilder = Schema.newBuilder()
        schemaBuilder.primaryKey("partition_key", "primary_key")
        schemaBuilder.partitionKeys("partition_key")
        schemaBuilder.column("partition_key", DataTypes.STRING())
        schemaBuilder.column("primary_key", DataTypes.STRING())
        schemaBuilder.column("value", DataTypes.STRING())

        val schemaManager = SchemaManager(fileIO, paimonPath)
        schemaManager.createTable(schemaBuilder.build(), true)

        return FileStoreTableFactory.create(fileIO, paimonPath, schemaManager.latest().get(), paimonOptions, CatalogEnvironment.empty())
    }

    private fun writeRecords(
        env: StreamExecutionEnvironment,
        table: FileStoreTable,
    ) {
        val testRowData =
            listOf(
                createRowData("a", "aa", "aaaa"),
                createRowData("a", "ab", "abab"),
                createRowData("b", "bb", "bbbb"),
            )

        FlinkSinkBuilder(table)
            .forRowData(
                env
                    .fromData(testRowData)
                    .returns(TypeInformation.of(RowData::class.java)),
            ).build()
        env.execute("write")
    }

    private fun createRowData(
        partitionKey: String,
        primaryKey: String,
        value: String,
    ): RowData {
        val rowData = GenericRowData(3)
        rowData.setField(0, StringData.fromString(partitionKey))
        rowData.setField(1, StringData.fromString(primaryKey))
        rowData.setField(2, StringData.fromString(value))
        return rowData
    }
}

test_streaming logs
[flink-pekko.actor.default-dispatcher-6] INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl - Allocated slot for c56c8f145715dcc5253b375dc2bff016 with resources ResourceProfile{cpuCores=2, taskHeapMemory=1.953gb (2097152000 bytes), taskOffHeapMemory=0 bytes, managedMemory=0 bytes, networkMemory=320.000kb (327680 bytes)}.
[flink-pekko.actor.default-dispatcher-6] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Global Committer : test_streaming -> end: Writer (1/1) (attempt #0) with attempt id f29d6a3854e6e59149aa4d5bb957c66f_d14a00a5530c873d55f22dd652832843_0_0 and vertex id d14a00a5530c873d55f22dd652832843_0 to fa43ad1b-8e10-4190-a2f5-d71a4c6aa939 @ localhost (dataPort=63616) with allocation id c56c8f145715dcc5253b375dc2bff016

test_batch logs
[flink-pekko.actor.default-dispatcher-7] INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl - Allocated slot for 2cb8bf8ffd5ef76b5f02f0b3f2d3ae35 with resources ResourceProfile{taskHeapMemory=1024.000gb (1099511627776 bytes), taskOffHeapMemory=1024.000gb (1099511627776 bytes), managedMemory=80.000mb (83886080 bytes), networkMemory=64.000mb (67108864 bytes)}.
[flink-pekko.actor.default-dispatcher-4] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Source: Collection Source -> Map (1/1) (attempt #0) with attempt id 5964ede24e0614729c00b730978155d0_cbc357ccb763df2852fee8c4fc7d55f2_0_0 and vertex id cbc357ccb763df2852fee8c4fc7d55f2_0 to 18dd7510-fb44-4a04-a422-3c218344346f @ localhost (dataPort=63615) with allocation id 2cb8bf8ffd5ef76b5f02f0b3f2d3ae35
[flink-pekko.actor.default-dispatcher-8] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying dynamic-bucket-assigner (1/1) (attempt #0) with attempt id 5964ede24e0614729c00b730978155d0_9dd63673dd41ea021b896d5203f3ba7c_0_0 and vertex id 9dd63673dd41ea021b896d5203f3ba7c_0 to 18dd7510-fb44-4a04-a422-3c218344346f @ localhost (dataPort=63615) with allocation id 2cb8bf8ffd5ef76b5f02f0b3f2d3ae35
[flink-pekko.actor.default-dispatcher-6] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Writer : test_batch (1/1) (attempt #0) with attempt id 5964ede24e0614729c00b730978155d0_1a936cb48657826a536f331e9fb33b5e_0_0 and vertex id 1a936cb48657826a536f331e9fb33b5e_0 to 18dd7510-fb44-4a04-a422-3c218344346f @ localhost (dataPort=63615) with allocation id 2cb8bf8ffd5ef76b5f02f0b3f2d3ae35
[flink-pekko.actor.default-dispatcher-6] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Global Committer : test_batch -> end: Writer (1/1) (attempt #0) with attempt id 5964ede24e0614729c00b730978155d0_d14a00a5530c873d55f22dd652832843_0_0 and vertex id d14a00a5530c873d55f22dd652832843_0 to 18dd7510-fb44-4a04-a422-3c218344346f @ localhost (dataPort=63615) with allocation id 2cb8bf8ffd5ef76b5f02f0b3f2d3ae35

test_logs.txt

What doesn't meet your expectations?

The settings sink.commiter-cpu and sink.commiter-memory are very important for compaction when the table and the size of the commitable is very large.

When setting these values in a streaming job, the resource configurations are applied correctly, and the global committer gets more resources.

When setting these values in a batch job, the global committer instead gets allocated to an existing slot, and does not get the specified resources.

This behaviour is reproducible on the Flink local MiniCluster. See logs attached and as inline comments. Tracing the allocation id for the Global Committer operator, we can see that it gets the specified resources in streaming mode, but not batch mode.

Anything else?

We are handling very large tables in our jobs. Since we have multiple writers, we therefore need to rely on dedicated compaction jobs in batch mode.

We use high writer parallelism (scale wide), and need to allocate more resources for the global committer (scale tall).

We have found that full compaction can struggle to complete in the Global Committer step if the committable gets too large (e.g. if compaction hasn't ran frequently enough). Sometimes, the Global Committer takes longer time than the Compaction writer step, leading to very poor resource efficiency.

Are you willing to submit a PR?

  • I'm willing to submit a PR!

^^theoretically open to submitting a PR, but don't know how/where to fix this problem!

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions