Skip to content

Conversation

Paramvir109
Copy link

@Paramvir109 Paramvir109 commented Jun 4, 2025

What changes were proposed in this pull request?

Need to properly propagate the exception which can occur in drainAndLoadForPartition method while parallel loading of fast Hash tables by the threads.
In the current scenario the thread is silently dying causing incorrect(lesser number)of HT entries to be reported which is giving incorrect results

Why are the changes needed?

It fixes bug caused by https://issues.apache.org/jira/browse/HIVE-25149 that hides actual exceptions and gives incorrect results. Example query is added in the jira : https://issues.apache.org/jira/browse/HIVE-28735

Is the change a dependency upgrade?

No

Does this PR introduce any user-facing change?

No

How was this patch tested?

Tested using the sample query.

Hive 3

Caused by: java.lang.OutOfMemoryError: Java heap space
    at org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastLongHashTable.expandAndRehash(VectorMapJoinFastLongHashTable.java:166)
    at org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastLongHashTable.add(VectorMapJoinFastLongHashTable.java:100)
    at org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastLongHashTable.adaptPutRow(VectorMapJoinFastLongHashTable.java:91)
    at org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastLongHashMap.putRow(VectorMapJoinFastLongHashMap.java:147)
    at org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastTableContainer.putRow(VectorMapJoinFastTableContainer.java:184)
    at org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastHashTableLoader.load(VectorMapJoinFastHashTableLoader.java:130)
    at org.apache.hadoop.hive.ql.exec.MapJoinOperator.loadHashTableInternal(MapJoinOperator.java:388)
    at org.apache.hadoop.hive.ql.exec.MapJoinOperator.loadHashTable(MapJoinOperator.java:457)
    at org.apache.hadoop.hive.ql.exec.MapJoinOperator.lambda$initializeOp$0(MapJoinOperator.java:241)
    ... 3 more

Hive 4

Before the fix : Wrong results on each run
After the fix : Getting same OOM exception as in hive 3

Also added a unit test which verifies the same

vectorMapJoinFastHashTableLoader.initHTLoadingService(1048577);
List<CompletableFuture<Void>> loaderTasks = vectorMapJoinFastHashTableLoader.submitQueueDrainThreads(mockTableContainer);
assertEquals(2, loaderTasks.size());
vectorMapJoinFastHashTableLoader.getLoadExecService().shutdown();
Copy link
Member

@deniskuzZ deniskuzZ Jun 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think that is a good way to repro the issue:

  • why prepare mock drainAndLoadForPartition and not even trigger it?

Instead, execute direct getLoadExecService().shutdown() and CompletableFuture.allOf.

  • your test should focus on ExecutionException and not the InterruptedException (based on PR description)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@deniskuzZ , when we call the submitQueueDrainThreads method - it will internally call drainAndLoadForPartition and this is what I want to test. If any error occurs in drainAndLoadForPartition, it will get caught and RuntimeException will be thrown which will be propagated as ExecutionException.
In my repro sceanrio the code flow was reaching till here but not being propagated properly.

Copy link
Member

@deniskuzZ deniskuzZ Jun 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here would be wrapped in ExecutionException.
Your test is checking for InterruptedException which was already covered in https://github.com/apache/hive/pull/5845/files#diff-573e1d79d59124631df039ba5fedfd037457f3e3c3b8fb5e349b4086191bc197L330

Copy link
Author

@Paramvir109 Paramvir109 Jun 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExecutionException thrown = assertThrows(ExecutionException.class, () -> {
            CompletableFuture.allOf(loaderTasks.toArray(new CompletableFuture[0]))
                    .get(2, TimeUnit.MINUTES);
        });

The above code block is checking for ExecutionException only

 assertInstanceOf(InterruptedException.class, cause.getCause());

The above assertion is just extra assertion which tells the underlying cause of ExecutionException. I can remove it too

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should test the production code drainAndLoadForPartition, configure drainAndLoadForPartition to throw the exception, and not test the TEST

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't test the drainAndLoadForPartition() as the original OOM scenario is reproducible only with specific data under some low memory conditions given in the jira and I'll have to mimic the values for vectorMapJoinFastTableContainer to give OOM error which is not possible since the issue can't be reproduced via a unit test. Therefore I'm causing OOM error whenever drainAndLoadForPartition() is called and my part of the code will check if that OOM error can be propagated as RuntimeException in the main thread

Copy link

@@ -83,6 +88,10 @@ public VectorMapJoinFastHashTableLoader() {

}

ExecutorService getLoadExecService() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need this so I can actually shutdown the service in tearDown method after running unit test. Since loadExecService is a private member of the class, I thought to make a package-private getter for the same so that I can access it in unit test.
Other approach will be to change access modifier of loadExecService to package-private

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drainAndLoadForPartition in the finally should shutdown the service

Copy link
Author

@Paramvir109 Paramvir109 Aug 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @deniskuzZ . Thanks for the review and sorry for replying a bit late.
service is shutdown here in the load() but since I can't test the load() method (As the original OOM scenario is reproducible only with specific data under some low memory conditions given in the jira) via unit test therefore I need this package private getter to actually close the service in my unit test

Copy link

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants