Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[build] MongoDBConnectorITCase is timeout on Azure #600

Merged
merged 4 commits into from
Nov 12, 2021

Conversation

Jiabao-Sun
Copy link
Contributor

Append log to console to track MongoDBConnectorITCase is timeout on Azure. #584

@Jiabao-Sun Jiabao-Sun marked this pull request as ready for review November 12, 2021 04:09
@Jiabao-Sun
Copy link
Contributor Author

https://github.com/apache/flink/blob/db5af67aee7fb449a9db8930213eec7dc925e58c/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java#L415-L419

if (expectedSize != -1 && receivedNum == expectedSize) {
    // some sources are infinite (e.g. kafka),
    // we throw a SuccessException to indicate job is finished.
    throw new SuccessException();
}

The main reason of #584 is that the change of TestValuesRuntimeFunctions's localRawResult and rawResultState are not atomic. In particular, the SuccessException will kill the flink job that snapshotState may no longer be called. This will cause the localRawResult and rawResultState ​​to be inconsistent so the test never stops.

A simple solution is to remove the configuration sink-expected-messages-num to prevent SuccessException being thrown. This problem may also occur in other connector tests, I recommend removing all configuration of sink-expected-messages-num.

Here is some stack trace.

22437 [main] INFO  org.mongodb.driver.connection - Opened connection [connectionId{localValue:6, serverValue:12}] to 192.168.145.55:57245
25997 [debezium-engine] INFO  com.mongodb.kafka.connect.source.heartbeat.HeartbeatManager - Generating heartbeat event. {"_data": "82618DF848000000072B022C0100296E5A10047E63E49803684AAABCCCD0540A4E45B646645F696400641000000000000000000001110004"}
26034 [GroupAggregate(groupBy=[name], select=[name, SUM_RETRACT(weight) AS EXPR$1]) -> Calc(select=[name, CAST(EXPR$1) AS weightSum]) -> NotNullEnforcer(fields=[name]) -> Sink: Sink(table=[default_catalog.default_database.sink], fields=[name, weightSum]) (1/1)#0] WARN  org.apache.flink.runtime.taskmanager.Task - GroupAggregate(groupBy=[name], select=[name, SUM_RETRACT(weight) AS EXPR$1]) -> Calc(select=[name, CAST(EXPR$1) AS weightSum]) -> NotNullEnforcer(fields=[name]) -> Sink: Sink(table=[default_catalog.default_database.sink], fields=[name, weightSum]) (1/1)#0 (5fbfa0b72012b025d7318640ea535444) switched from RUNNING to FAILED with failure cause: org.apache.flink.test.util.SuccessException
	at org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions$KeyedUpsertingSinkFunction.invoke(TestValuesRuntimeFunctions.java:460)
	at org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions$KeyedUpsertingSinkFunction.invoke(TestValuesRuntimeFunctions.java:349)
	at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
	at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
	at StreamExecCalc$67.processElement(Unknown Source)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
	at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:194)
	at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
	at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
	at java.lang.Thread.run(Thread.java:748)

26036 [GroupAggregate(groupBy=[name], select=[name, SUM_RETRACT(weight) AS EXPR$1]) -> Calc(select=[name, CAST(EXPR$1) AS weightSum]) -> NotNullEnforcer(fields=[name]) -> Sink: Sink(table=[default_catalog.default_database.sink], fields=[name, weightSum]) (1/1)#0] INFO  org.apache.flink.runtime.taskmanager.Task - Freeing task resources for GroupAggregate(groupBy=[name], select=[name, SUM_RETRACT(weight) AS EXPR$1]) -> Calc(select=[name, CAST(EXPR$1) AS weightSum]) -> NotNullEnforcer(fields=[name]) -> Sink: Sink(table=[default_catalog.default_database.sink], fields=[name, weightSum]) (1/1)#0 (5fbfa0b72012b025d7318640ea535444).
26046 [flink-akka.actor.default-dispatcher-6] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task GroupAggregate(groupBy=[name], select=[name, SUM_RETRACT(weight) AS EXPR$1]) -> Calc(select=[name, CAST(EXPR$1) AS weightSum]) -> NotNullEnforcer(fields=[name]) -> Sink: Sink(table=[default_catalog.default_database.sink], fields=[name, weightSum]) (1/1)#0 5fbfa0b72012b025d7318640ea535444.
26064 [flink-akka.actor.default-dispatcher-11] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph - GroupAggregate(groupBy=[name], select=[name, SUM_RETRACT(weight) AS EXPR$1]) -> Calc(select=[name, CAST(EXPR$1) AS weightSum]) -> NotNullEnforcer(fields=[name]) -> Sink: Sink(table=[default_catalog.default_database.sink], fields=[name, weightSum]) (1/1) (5fbfa0b72012b025d7318640ea535444) switched from RUNNING to FAILED on 9e31c7f7-aeb5-42fb-a3c1-acf78c265e19 @ localhost (dataPort=-1).
org.apache.flink.test.util.SuccessException: null
	at org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions$KeyedUpsertingSinkFunction.invoke(TestValuesRuntimeFunctions.java:460) ~[flink-table-planner-blink_2.11-1.13.1-tests.jar:1.13.1]
	at org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions$KeyedUpsertingSinkFunction.invoke(TestValuesRuntimeFunctions.java:349) ~[flink-table-planner-blink_2.11-1.13.1-tests.jar:1.13.1]
	at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65) ~[flink-table-runtime-blink_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at StreamExecCalc$67.processElement(Unknown Source) ~[?:?]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:194) ~[flink-table-runtime-blink_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43) ~[flink-table-runtime-blink_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) ~[flink-runtime_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) ~[flink-runtime_2.11-1.13.1.jar:1.13.1]
	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_301]
26082 [flink-akka.actor.default-dispatcher-11] INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy - Calculating tasks to restart to recover the failed task b5c8d46f3e7b141acf271f12622e752b_0.
26083 [flink-akka.actor.default-dispatcher-11] INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy - 3 tasks should be restarted to recover the failed task b5c8d46f3e7b141acf271f12622e752b_0. 
26087 [flink-akka.actor.default-dispatcher-11] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph - Job insert-into_default_catalog.default_database.sink (311d818bbf5fd3a7bdedd19ef6dc369f) switched from state RUNNING to FAILING.
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) ~[flink-runtime_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) ~[flink-runtime_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207) ~[flink-runtime_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197) ~[flink-runtime_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188) ~[flink-runtime_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677) ~[flink-runtime_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79) ~[flink-runtime_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435) ~[flink-runtime_2.11-1.13.1.jar:1.13.1]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_301]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_301]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_301]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_301]
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) ~[flink-runtime_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) ~[flink-runtime_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-runtime_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-runtime_2.11-1.13.1.jar:1.13.1]
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [akka-actor_2.11-2.5.21.jar:2.5.21]
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [akka-actor_2.11-2.5.21.jar:2.5.21]
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [scala-library-2.11.12.jar:?]
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [akka-actor_2.11-2.5.21.jar:2.5.21]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [scala-library-2.11.12.jar:?]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [scala-library-2.11.12.jar:?]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [scala-library-2.11.12.jar:?]
	at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:517) [akka-actor_2.11-2.5.21.jar:2.5.21]
	at akka.actor.Actor$class.aroundReceive(Actor.scala) [akka-actor_2.11-2.5.21.jar:2.5.21]
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [akka-actor_2.11-2.5.21.jar:2.5.21]
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [akka-actor_2.11-2.5.21.jar:2.5.21]
	at akka.actor.ActorCell.invoke(ActorCell.scala:561) [akka-actor_2.11-2.5.21.jar:2.5.21]
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [akka-actor_2.11-2.5.21.jar:2.5.21]
	at akka.dispatch.Mailbox.run(Mailbox.scala:225) [akka-actor_2.11-2.5.21.jar:2.5.21]
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [akka-actor_2.11-2.5.21.jar:2.5.21]
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [akka-actor_2.11-2.5.21.jar:2.5.21]
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [akka-actor_2.11-2.5.21.jar:2.5.21]
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [akka-actor_2.11-2.5.21.jar:2.5.21]
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [akka-actor_2.11-2.5.21.jar:2.5.21]
Caused by: org.apache.flink.test.util.SuccessException
	at org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions$KeyedUpsertingSinkFunction.invoke(TestValuesRuntimeFunctions.java:460) ~[flink-table-planner-blink_2.11-1.13.1-tests.jar:1.13.1]
	at org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions$KeyedUpsertingSinkFunction.invoke(TestValuesRuntimeFunctions.java:349) ~[flink-table-planner-blink_2.11-1.13.1-tests.jar:1.13.1]
	at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65) ~[flink-table-runtime-blink_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at StreamExecCalc$67.processElement(Unknown Source) ~[?:?]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:194) ~[flink-table-runtime-blink_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43) ~[flink-table-runtime-blink_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) ~[flink-runtime_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) ~[flink-runtime_2.11-1.13.1.jar:1.13.1]
	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_301]
26092 [flink-akka.actor.default-dispatcher-11] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: TableSourceScan(table=[[default_catalog, default_database, mongodb_source]], fields=[_id, name, description, weight]) (1/1) (66988cc7518b1294893b26f1fa6b8168) switched from RUNNING to CANCELING.
26094 [flink-akka.actor.default-dispatcher-6] INFO  org.apache.flink.runtime.taskmanager.Task - Attempting to cancel task Source: TableSourceScan(table=[[default_catalog, default_database, mongodb_source]], fields=[_id, name, description, weight]) (1/1)#0 (66988cc7518b1294893b26f1fa6b8168).
26094 [flink-akka.actor.default-dispatcher-6] INFO  org.apache.flink.runtime.taskmanager.Task - Source: TableSourceScan(table=[[default_catalog, default_database, mongodb_source]], fields=[_id, name, description, weight]) (1/1)#0 (66988cc7518b1294893b26f1fa6b8168) switched from RUNNING to CANCELING.
26094 [flink-akka.actor.default-dispatcher-6] INFO  org.apache.flink.runtime.taskmanager.Task - Triggering cancellation of task code Source: TableSourceScan(table=[[default_catalog, default_database, mongodb_source]], fields=[_id, name, description, weight]) (1/1)#0 (66988cc7518b1294893b26f1fa6b8168).
26096 [Canceler for Source: TableSourceScan(table=[[default_catalog, default_database, mongodb_source]], fields=[_id, name, description, weight]) (1/1)#0 (66988cc7518b1294893b26f1fa6b8168).] INFO  io.debezium.embedded.EmbeddedEngine - Stopping the embedded engine
26096 [Canceler for Source: TableSourceScan(table=[[default_catalog, default_database, mongodb_source]], fields=[_id, name, description, weight]) (1/1)#0 (66988cc7518b1294893b26f1fa6b8168).] INFO  io.debezium.embedded.EmbeddedEngine - Waiting for PT5M for connector to stop
26096 [flink-akka.actor.default-dispatcher-11] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph - ChangelogNormalize(key=[_id]) -> Calc(select=[name, weight]) (1/1) (01887e841227be2e55c8de00a5ed2520) switched from RUNNING to CANCELING.
26098 [flink-akka.actor.default-dispatcher-6] INFO  org.apache.flink.runtime.taskmanager.Task - Attempting to cancel task ChangelogNormalize(key=[_id]) -> Calc(select=[name, weight]) (1/1)#0 (01887e841227be2e55c8de00a5ed2520).
26098 [flink-akka.actor.default-dispatcher-6] INFO  org.apache.flink.runtime.taskmanager.Task - ChangelogNormalize(key=[_id]) -> Calc(select=[name, weight]) (1/1)#0 (01887e841227be2e55c8de00a5ed2520) switched from RUNNING to CANCELING.
26098 [flink-akka.actor.default-dispatcher-6] INFO  org.apache.flink.runtime.taskmanager.Task - Triggering cancellation of task code ChangelogNormalize(key=[_id]) -> Calc(select=[name, weight]) (1/1)#0 (01887e841227be2e55c8de00a5ed2520).
26100 [ChangelogNormalize(key=[_id]) -> Calc(select=[name, weight]) (1/1)#0] INFO  org.apache.flink.runtime.taskmanager.Task - ChangelogNormalize(key=[_id]) -> Calc(select=[name, weight]) (1/1)#0 (01887e841227be2e55c8de00a5ed2520) switched from CANCELING to CANCELED.
26100 [ChangelogNormalize(key=[_id]) -> Calc(select=[name, weight]) (1/1)#0] INFO  org.apache.flink.runtime.taskmanager.Task - Freeing task resources for ChangelogNormalize(key=[_id]) -> Calc(select=[name, weight]) (1/1)#0 (01887e841227be2e55c8de00a5ed2520).
26102 [flink-akka.actor.default-dispatcher-6] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state CANCELED to JobManager for task ChangelogNormalize(key=[_id]) -> Calc(select=[name, weight]) (1/1)#0 01887e841227be2e55c8de00a5ed2520.
26103 [flink-akka.actor.default-dispatcher-10] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph - ChangelogNormalize(key=[_id]) -> Calc(select=[name, weight]) (1/1) (01887e841227be2e55c8de00a5ed2520) switched from CANCELING to CANCELED.
28511 [debezium-engine] INFO  com.mongodb.kafka.connect.source.MongoSourceTask - Stopping MongoDB source task
28528 [docker-java-stream--565476724] INFO  com.ververica.cdc.connectors.mongodb.MongoDBTestBase - STDOUT: {"t":{"$date":"2021-11-12T05:14:54.122+00:00"},"s":"I",  "c":"NETWORK",  "id":22944,   "ctx":"conn11","msg":"Connection ended","attr":{"remote":"172.17.0.1:65146","uuid":"f3b5ab9b-2f85-4a80-9708-646faa5f8e9c","connectionId":11,"connectionCount":5}}
28532 [docker-java-stream--565476724] INFO  com.ververica.cdc.connectors.mongodb.MongoDBTestBase - STDOUT: {"t":{"$date":"2021-11-12T05:14:54.124+00:00"},"s":"I",  "c":"-",        "id":20883,   "ctx":"conn9","msg":"Interrupted operation as its client disconnected","attr":{"opId":427}}
28533 [docker-java-stream--565476724] INFO  com.ververica.cdc.connectors.mongodb.MongoDBTestBase - STDOUT: {"t":{"$date":"2021-11-12T05:14:54.126+00:00"},"s":"I",  "c":"NETWORK",  "id":22944,   "ctx":"conn9","msg":"Connection ended","attr":{"remote":"172.17.0.1:65140","uuid":"e0496a48-ce0d-4338-a755-913b272b2502","connectionId":9,"connectionCount":4}}
28534 [docker-java-stream--565476724] INFO  com.ververica.cdc.connectors.mongodb.MongoDBTestBase - STDOUT: {"t":{"$date":"2021-11-12T05:14:54.128+00:00"},"s":"I",  "c":"NETWORK",  "id":22944,   "ctx":"conn10","msg":"Connection ended","attr":{"remote":"172.17.0.1:65142","uuid":"b2be752a-bab8-4578-895c-8df7932d6424","connectionId":10,"connectionCount":3}}
28538 [Source: TableSourceScan(table=[[default_catalog, default_database, mongodb_source]], fields=[_id, name, description, weight]) (1/1)#0] INFO  io.debezium.embedded.EmbeddedEngine - Stopping the embedded engine
28540 [Source: TableSourceScan(table=[[default_catalog, default_database, mongodb_source]], fields=[_id, name, description, weight]) (1/1)#0] INFO  org.apache.flink.runtime.taskmanager.Task - Source: TableSourceScan(table=[[default_catalog, default_database, mongodb_source]], fields=[_id, name, description, weight]) (1/1)#0 (66988cc7518b1294893b26f1fa6b8168) switched from CANCELING to CANCELED.
28540 [Source: TableSourceScan(table=[[default_catalog, default_database, mongodb_source]], fields=[_id, name, description, weight]) (1/1)#0] INFO  org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Source: TableSourceScan(table=[[default_catalog, default_database, mongodb_source]], fields=[_id, name, description, weight]) (1/1)#0 (66988cc7518b1294893b26f1fa6b8168).
28540 [flink-akka.actor.default-dispatcher-6] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state CANCELED to JobManager for task Source: TableSourceScan(table=[[default_catalog, default_database, mongodb_source]], fields=[_id, name, description, weight]) (1/1)#0 66988cc7518b1294893b26f1fa6b8168.
28541 [flink-akka.actor.default-dispatcher-6] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: TableSourceScan(table=[[default_catalog, default_database, mongodb_source]], fields=[_id, name, description, weight]) (1/1) (66988cc7518b1294893b26f1fa6b8168) switched from CANCELING to CANCELED.
28549 [flink-akka.actor.default-dispatcher-10] INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager - Clearing resource requirements of job 311d818bbf5fd3a7bdedd19ef6dc369f
28549 [flink-akka.actor.default-dispatcher-6] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph - Job insert-into_default_catalog.default_database.sink (311d818bbf5fd3a7bdedd19ef6dc369f) switched from state FAILING to FAILED.
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) ~[flink-runtime_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) ~[flink-runtime_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207) ~[flink-runtime_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197) ~[flink-runtime_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188) ~[flink-runtime_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677) ~[flink-runtime_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79) ~[flink-runtime_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435) ~[flink-runtime_2.11-1.13.1.jar:1.13.1]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_301]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_301]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_301]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_301]
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) ~[flink-runtime_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) ~[flink-runtime_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-runtime_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-runtime_2.11-1.13.1.jar:1.13.1]
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [akka-actor_2.11-2.5.21.jar:2.5.21]
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [akka-actor_2.11-2.5.21.jar:2.5.21]
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [scala-library-2.11.12.jar:?]
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [akka-actor_2.11-2.5.21.jar:2.5.21]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [scala-library-2.11.12.jar:?]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [scala-library-2.11.12.jar:?]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [scala-library-2.11.12.jar:?]
	at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:517) [akka-actor_2.11-2.5.21.jar:2.5.21]
	at akka.actor.Actor$class.aroundReceive(Actor.scala) [akka-actor_2.11-2.5.21.jar:2.5.21]
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [akka-actor_2.11-2.5.21.jar:2.5.21]
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [akka-actor_2.11-2.5.21.jar:2.5.21]
	at akka.actor.ActorCell.invoke(ActorCell.scala:561) [akka-actor_2.11-2.5.21.jar:2.5.21]
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [akka-actor_2.11-2.5.21.jar:2.5.21]
	at akka.dispatch.Mailbox.run(Mailbox.scala:225) [akka-actor_2.11-2.5.21.jar:2.5.21]
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [akka-actor_2.11-2.5.21.jar:2.5.21]
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [akka-actor_2.11-2.5.21.jar:2.5.21]
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [akka-actor_2.11-2.5.21.jar:2.5.21]
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [akka-actor_2.11-2.5.21.jar:2.5.21]
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [akka-actor_2.11-2.5.21.jar:2.5.21]
Caused by: org.apache.flink.test.util.SuccessException
	at org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions$KeyedUpsertingSinkFunction.invoke(TestValuesRuntimeFunctions.java:460) ~[flink-table-planner-blink_2.11-1.13.1-tests.jar:1.13.1]
	at org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions$KeyedUpsertingSinkFunction.invoke(TestValuesRuntimeFunctions.java:349) ~[flink-table-planner-blink_2.11-1.13.1-tests.jar:1.13.1]
	at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65) ~[flink-table-runtime-blink_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at StreamExecCalc$67.processElement(Unknown Source) ~[?:?]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:194) ~[flink-table-runtime-blink_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43) ~[flink-table-runtime-blink_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) ~[flink-streaming-java_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) ~[flink-runtime_2.11-1.13.1.jar:1.13.1]
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) ~[flink-runtime_2.11-1.13.1.jar:1.13.1]
	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_301]
28553 [flink-akka.actor.default-dispatcher-6] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping checkpoint coordinator for job 311d818bbf5fd3a7bdedd19ef6dc369f.
28634 [flink-akka.actor.default-dispatcher-6] INFO  org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore - Shutting down
28659 [flink-akka.actor.default-dispatcher-10] INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Job 311d818bbf5fd3a7bdedd19ef6dc369f reached terminal state FAILED.
28665 [flink-akka.actor.default-dispatcher-6] INFO  org.apache.flink.runtime.jobmaster.JobMaster - Stopping the JobMaster for job insert-into_default_catalog.default_database.sink(311d818bbf5fd3a7bdedd19ef6dc369f).
28670 [flink-akka.actor.default-dispatcher-6] INFO  org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool - Releasing slot [b37e5b615897794be18d88ba0bbbd46a].
28671 [flink-akka.actor.default-dispatcher-11] INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl - Free slot TaskSlot(index:0, state:ACTIVE, resource profile: ResourceProfile{taskHeapMemory=256.000gb (274877906944 bytes), taskOffHeapMemory=256.000gb (274877906944 bytes), managedMemory=20.000mb (20971520 bytes), networkMemory=16.000mb (16777216 bytes)}, allocationId: b37e5b615897794be18d88ba0bbbd46a, jobId: 311d818bbf5fd3a7bdedd19ef6dc369f).
28672 [flink-akka.actor.default-dispatcher-10] INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager - Clearing resource requirements of job 311d818bbf5fd3a7bdedd19ef6dc369f
28672 [flink-akka.actor.default-dispatcher-6] INFO  org.apache.flink.runtime.jobmaster.JobMaster - Close ResourceManager connection 1923cae111c1370553e076d026eae737: Stopping JobMaster for job insert-into_default_catalog.default_database.sink(311d818bbf5fd3a7bdedd19ef6dc369f)..
28674 [flink-akka.actor.default-dispatcher-10] INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Disconnect job manager b9e6575f9b8172da6333b8c718e24286@akka://flink/user/rpc/jobmanager_3 for job 311d818bbf5fd3a7bdedd19ef6dc369f from the resource manager.
28677 [flink-akka.actor.default-dispatcher-11] INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService - Remove job 311d818bbf5fd3a7bdedd19ef6dc369f from job leader monitoring.
28679 [flink-akka.actor.default-dispatcher-11] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor - Close JobManager connection for job 311d818bbf5fd3a7bdedd19ef6dc369f.

@Jiabao-Sun
Copy link
Contributor Author

@wuchong @leonardBang
If you are available, please help review it.
By the way, I output the CI log to the console for troubleshooting.

@wuchong
Copy link
Member

wuchong commented Nov 12, 2021

Thanks for the investigation! I think remove the sink-expected-messages-num in the test makes sense, because we already wait for the sink using waitForSinkSize and cancel the job after checking result.

We can also upgrade Flink version to 1.13.3 which includes a bugfix may help it as well https://issues.apache.org/jira/browse/FLINK-22203.

@@ -17,6 +17,7 @@
################################################################################
rootLogger.level=INFO
rootLogger.appenderRef.out.ref=FileAppender
rootLogger.appenderRef.stdout.ref=ConsoleAppender
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, the test output is collected in artifacts, you can see them here:

image

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I'll revert the change.

@wuchong
Copy link
Member

wuchong commented Nov 12, 2021

Could you also upgrade Flink version to 1.13.3?

@Jiabao-Sun
Copy link
Contributor Author

Could you also upgrade Flink version to 1.13.3?

Done.

Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@wuchong wuchong merged commit 823c14c into apache:master Nov 12, 2021
@wuchong wuchong linked an issue Nov 12, 2021 that may be closed by this pull request
@Jiabao-Sun Jiabao-Sun deleted the tracking-mongo-test-timeout branch February 17, 2022 15:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

MongoDBConnectorITCase is timeout on Azure
2 participants