Skip to content

NPE for http sink when restore from checkpoint #195

@DonkeyHu

Description

@DonkeyHu

Description

I use this repo as sink connector for flink sql, but I found that the job can't restore from checkpoint and throw NullPointerException when the job crash. Except this case, I just start the flink app from checkpoint, it also throw NPE.

Used Version

flink version 1.16.3
flink-http-connector 0.16.0

Expected Behavior

The flink sql job can restore from checkpoint

Flink SQL Example

My example like below:

    // http sink
    public static String httpSinkDDL = "CREATE TEMPORARY TABLE http_sink_table (\n" +
            "  `region` STRING,\n" +
            "  `region_uv` BIGINT\n" +
            ") WITH (\n" +
            "  'connector' = 'http-sink',\n" +
            "  'url' = 'url" +
            "  'insert-method' = 'POST',\n" +
            "  'gid.connector.http.sink.header.Content-Type' = 'application/json',\n" +
            "  'gid.connector.http.sink.writer.request.mode' = 'batch',\n" +
            "  'gid.connector.http.sink.request.batch.size' = '100',\n" +
            "  'format' = 'json'\n" +
            ");";

    public static String transformDMLPT = "INSERT INTO http_sink_table \n" +
            "SELECT\n" +
            "  region,\n" +
            "  COUNT(DISTINCT account_id) AS region_uv\n" +
            "FROM (\n" +
            "  SELECT\n" +
            "    s.account_id,\n" +
            "    s.proc_time,\n" +
            "    COALESCE(LangToRegionFuc(h.f.LANG), 'en') AS region \n" +
            "  FROM (\n" +
            "    SELECT\n" +
            "      CONCAT(SUBSTRING(MD5(t2.`#account_id`), 1, 4), '|', t2.`#account_id`) AS account_id,\n" +
            "      source.proc_time AS proc_time \n" +
            "    FROM\n" +
            "      kafka_source_table AS source,\n" +
            "      UNNEST(source.data_object.data) AS t2 (`#account_id`, `#event_name`, `properties`)\n" +
            "    WHERE\n" +
            "      t2.`#event_name` = 'Match' and t2.properties.action = 'join_bellmatch' \n" +
            "  ) AS s\n" +
            "  JOIN hbase_region_table FOR SYSTEM_TIME AS OF s.proc_time AS h\n" +
            "  ON s.account_id = h.rk\n" +
            ")\n" +
            "GROUP BY\n" +
            "  HOP(proc_time, INTERVAL '1' MINUTE, INTERVAL '3' HOUR),\n" +
            "  region;";

Exception Logs

2025-11-24 19:58:20,954 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Attempting to fail task externally GroupWindowAggregate[8] -> http_sink_table[9]: Writer (2/2)#5 (a93410a57893a814791e5f83ced20de0_5e4132aa89d205854be108b7fd747a9c_1_5).
2025-11-24 19:58:20,955 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - GroupWindowAggregate[8] -> http_sink_table[9]: Writer (2/2)#5 (a93410a57893a814791e5f83ced20de0_5e4132aa89d205854be108b7fd747a9c_1_5) switched from INITIALIZING to FAILED with failure cause: org.apache.flink.runtime.taskmanager.AsynchronousException: Caught exception while processing timer.
	at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1573)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1548)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1688)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$22(StreamTask.java:1677)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
	at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:672)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: TimerException{org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator}
	... 15 more
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
	at org.apache.flink.table.runtime.operators.window.AggregateWindowOperator.collect(AggregateWindowOperator.java:218)
	at org.apache.flink.table.runtime.operators.window.AggregateWindowOperator.emitWindowResult(AggregateWindowOperator.java:208)
	at org.apache.flink.table.runtime.operators.window.WindowOperator.onProcessingTime(WindowOperator.java:405)
	at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:284)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1686)
	... 14 more
Caused by: java.lang.NullPointerException
	at org.apache.flink.formats.json.JsonRowDataSerializationSchema.serialize(JsonRowDataSerializationSchema.java:99)
	at org.apache.flink.formats.json.JsonRowDataSerializationSchema.serialize(JsonRowDataSerializationSchema.java:43)
	at com.getindata.connectors.http.internal.table.SerializationSchemaElementConverter.apply(SerializationSchemaElementConverter.java:51)
	at com.getindata.connectors.http.internal.table.SerializationSchemaElementConverter.apply(SerializationSchemaElementConverter.java:14)
	at org.apache.flink.connector.base.sink.writer.AsyncSinkWriter.write(AsyncSinkWriter.java:327)
	at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:160)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
	... 24 more

###Reproduce

Actually this issue can always can be reproduced as long as the job start from checkpoint.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions