Skip to content

[MINOR] Prevent timeline server from being reused in ITTestSchemaEvol…#8698

Merged
danny0405 merged 1 commit intoapache:masterfrom
voonhous:minor_fix_failing_it_schema_evolution_tests
May 13, 2023
Merged

[MINOR] Prevent timeline server from being reused in ITTestSchemaEvol…#8698
danny0405 merged 1 commit intoapache:masterfrom
voonhous:minor_fix_failing_it_schema_evolution_tests

Conversation

@voonhous
Copy link
Member

@voonhous voonhous commented May 12, 2023

…ution

Change Logs

Fix org.apache.hudi.table.ITTestSchemaEvolution tests that were failing with the following error:

22793 [main] INFO  org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView [] - Sending request : (http://192.168.0.123:65076/v1/hoodie/view/refresh/?basepath=%2Fvar%2Ffolders%2Fp_%2F09zfm5sx3v14w97hhk4vqrn8s817xt%2FT%2Fjunit3862861754052799897&lastinstantts=20230513014641714&timelinehash=4aa784e6b9f43fade1e8170cbbaa64a2580d2ae4f12562645a92458b7b851f1a)
22797 [main] INFO  org.apache.hudi.client.BaseHoodieClient [] - Stopping Timeline service !!
22797 [main] INFO  org.apache.hudi.client.transaction.TransactionManager [] - Transaction manager closed
22797 [main] INFO  org.apache.hudi.client.transaction.TransactionManager [] - Transaction manager closed

org.apache.hudi.exception.HoodieRemoteException: 192.168.0.123:65076 failed to respond

	at org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView.refresh(RemoteHoodieTableFileSystemView.java:471)
	at org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView.sync(RemoteHoodieTableFileSystemView.java:547)
	at org.apache.hudi.common.table.view.PriorityBasedFileSystemView.sync(PriorityBasedFileSystemView.java:275)
	at org.apache.hudi.client.BaseHoodieTableServiceClient.runTableServicesInline(BaseHoodieTableServiceClient.java:334)
	at org.apache.hudi.client.BaseHoodieWriteClient.runTableServicesInline(BaseHoodieWriteClient.java:545)
	at org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:251)
	at org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:211)
	at org.apache.hudi.client.BaseHoodieWriteClient.commitTableChange(BaseHoodieWriteClient.java:1434)
	at org.apache.hudi.client.BaseHoodieWriteClient.addColumn(BaseHoodieWriteClient.java:1319)
	at org.apache.hudi.table.ITTestSchemaEvolution.changeTableSchema(ITTestSchemaEvolution.java:209)
	at org.apache.hudi.table.ITTestSchemaEvolution.testSchemaEvolution(ITTestSchemaEvolution.java:160)
	at org.apache.hudi.table.ITTestSchemaEvolution.testSchemaEvolution(ITTestSchemaEvolution.java:155)
	at org.apache.hudi.table.ITTestSchemaEvolution.testSchemaEvolution(ITTestSchemaEvolution.java:151)
	at org.apache.hudi.table.ITTestSchemaEvolution.testMergeOnReadInputFormatBaseFileOnlyFilteringIterator(ITTestSchemaEvolution.java:100)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
	at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
	at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
	at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
	at java.util.ArrayList.forEach(ArrayList.java:1259)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
	at java.util.ArrayList.forEach(ArrayList.java:1259)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:108)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75)
	at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:57)
	at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
	at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
	at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
	at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
	at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)

In ITTestSchemaEvolution, multiple HoodieFlinkWriteClients are used for each stage of a single unit test. The RemoteHoodieTableFileSystemView from the previous WriteClient will be closed after they are done performing their individual tasks of the same test.

When the next stage of the same test is invoked, the new HoodieFlinkWriteClient that is created will attempt to reuse an EmbeddedTimelineServer that has already been stopped, causing the error shown above.

Please refer to org.apache.hudi.client.embedded.EmbeddedTimelineServerHelper#createEmbeddedTimelineService for logic on how EmbeddedTimelineServer is created.

For a clearer flow of what is going on:

8629 [flink-akka.actor.default-dispatcher-10] INFO  org.apache.hudi.client.BaseHoodieClient [] - Stopping Timeline service !!
18629 [flink-akka.actor.default-dispatcher-10] INFO  org.apache.hudi.client.embedded.EmbeddedTimelineService [] - Closing Timeline server
18629 [flink-akka.actor.default-dispatcher-10] INFO  org.apache.hudi.timeline.service.TimelineService [] - Closing Timeline Service
18641 [main] INFO  org.apache.hudi.client.BaseHoodieClient [] - Timeline Server already running. Not restarting the service
18646 [flink-akka.actor.default-dispatcher-10] INFO  org.apache.hudi.timeline.service.TimelineService [] - Closed Timeline Service
18646 [flink-akka.actor.default-dispatcher-10] INFO  org.apache.hudi.client.embedded.EmbeddedTimelineService [] - Closed Timeline server
18646 [flink-akka.actor.default-dispatcher-10] INFO  org.apache.hudi.client.transaction.TransactionManager [] - Transaction manager closed
18646 [flink-akka.actor.default-dispatcher-10] INFO  org.apache.hudi.client.transaction.TransactionManager [] - Transaction manager closed
18647 [flink-akka.actor.default-dispatcher-10] INFO  org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore [] - Shutting down
18648 [flink-akka.actor.default-dispatcher-10] INFO  org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool [] - Releasing slot [d57143d51eae5cfd8053dc94dbf74f3c].
  private void testSchemaEvolution(TableOptions tableOptions, boolean shouldCompact, ExpectedResult expectedResult) throws Exception {
    writeTableWithSchema1(tableOptions);
    changeTableSchema(tableOptions, shouldCompact);
    writeTableWithSchema2(tableOptions);
    checkAnswerEvolved(expectedResult.evolvedRows);
    checkAnswerCount(expectedResult.rowCount);
    checkAnswerWithMeta(tableOptions, expectedResult.rowsWithMeta);
  }

Let's refer to the code + stack trace above.

When writeTableWithSchema1 finishes, although await() is called, await will only block until the insert the table is complete. Tearing down of the JM + TM is a non-blocking task.

As such, the next function changeTableSchema is invoked immediately prior to JM + TM's tear down is complete. Referring to the stack trace, one can see that the tear down is running while changeTableSchema is creating a FlinkWriteClient, which in turn creates a timeline service.

Since tear down has not complete, the old embedded timeline server will be re-used.

Once tear down is complete, changeTableSchema will be using a FlinkWriteClient with an embedded timeline server that has already been closed, causing the FLAKY integration tests (If teardown can complete fast enough, changeTableSchema will create a new EmbeddedTimelineServer, else, connection failure + error)

Impact

Describe any public API or user-facing feature change or any performance impact.

None, tests were fixed.

Risk level (write none, low medium or high below)

If medium or high, explain what verification was done to mitigate the risks.

None, tests were fixed.

Documentation Update

Describe any necessary documentation update if there is any new feature, config, or user-facing change

  • The config description must be updated if new configs are added or the default value of the configs are changed
  • Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the
    ticket number here and follow the instruction to make
    changes to the website.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Copy link
Contributor

@danny0405 danny0405 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, nice findings.

@danny0405 danny0405 self-assigned this May 13, 2023
@danny0405 danny0405 merged commit 3df303e into apache:master May 13, 2023
@voonhous voonhous deleted the minor_fix_failing_it_schema_evolution_tests branch May 15, 2023 08:49
@voonhous
Copy link
Member Author

voonhous commented Jun 5, 2023

Apologies, there was a JIRA issue that was created for this issue here: HUDI-6175

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

engine:flink Flink integration

Projects

Archived in project

Development

Successfully merging this pull request may close these issues.

3 participants