Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HIVE-23324: Parallelise compaction directory cleaning process #1275

Merged
merged 13 commits into from Jul 31, 2020

Conversation

adesh-rao
Copy link
Contributor

NOTICE

Please create an issue in ASF JIRA before opening a pull request,
and you need to set the title of the pull request which starts with
the corresponding JIRA issue number. (e.g. HIVE-XXXXX: Fix a typo in YYY)
For more details, please see https://cwiki.apache.org/confluence/display/Hive/HowToContribute

@adesh-rao
Copy link
Contributor Author

@pvary Can you please take a look at this PR?


private ReplChangeManager replChangeManager;

@Override
public void init(AtomicBoolean stop) throws Exception {
super.init(stop);
replChangeManager = ReplChangeManager.getInstance(conf);
cleanerExecutor = Executors.newFixedThreadPool(conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_REQUEST_QUEUE));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be a good idea to shut down the executor when we finished the run loop.
What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, makes sense. Added it.

Copy link
Contributor

@pvary pvary Jul 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer the way how it was done in Worker.java:

  • Named threads
  • Priority down
  • Set daemon on/off
  • Stopped in finally

Shall we do it here, or should we create a follow-up jira for creating and cleaning up the executor threads for Cleaner and Initiator as well?

Thanks,
Peter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just saw the approach in Worker.java, it looks much better/cleaner. I will make the changes here only ()for both cleaner/intiator) and update once it is ready.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@belugabehr
Copy link
Contributor

Let us not re-invent the wheel here:

  public static void main(String[] args) throws IOException
  {

    ExecutorService service = Executors.newFixedThreadPool(2);
    CompletionService<String> completionService = new ExecutorCompletionService<>(service);

    for (int i = 0; i < 10; i++)
    {
      final int t = i;
      completionService.submit(() -> {
        return clean("File" + t);
      });
    }

    for (int i = 0; i < 10; ++i) {
      try {
          String path = completionService.take().get();
          System.out.println(path);
      } catch (Exception ignore) {}
  }
  }

  public static String clean(String path)
  {
    return "Cleaned: " + path;
  }

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorCompletionService.html

@adesh-rao
Copy link
Contributor Author

@belugabehr

  1. clean method does not return anything (void). I couldn't find a way to create a CompletionService on void return type. This means that to use CompletionService, clean method's definition need to be changed.

  2. we don't care about which executor threads are getting completed when, and also, all the related logging about directory/files deletion are being done in clean method only.

Therefore,
`
for (int i = 0; i < 10; ++i) {

  try {

      String path = completionService.take().get();

      System.out.println(path);

  } catch (Exception ignore) {}

}`

the above code will be extra lines of code when we can just use join() instead (I think that might have been the initial intention too).

So, the suggestion that you have provided, if necessary, we can take it up in a separate jira (This will require similar changes in Initiator thread too).

@belugabehr
Copy link
Contributor

belugabehr commented Jul 21, 2020

Void type:

  public static void main(String[] args) throws IOException
  {

    ExecutorService service = Executors.newFixedThreadPool(2);
    CompletionService<Void> completionService = new ExecutorCompletionService<>(service);

    for (int i = 0; i < 10; i++)
    {
      final int t = i;
      completionService.submit(() -> {
        clean("File" + t);
        return null;
      });
    }

    for (int i = 0; i < 10; ++i) {
      try {
          completionService.take().get();
      } catch (Exception ignore) {}
  }
  }

  public static String clean(String path)
  {
    return "Cleaned: " + path;
  }

@adesh-rao
Copy link
Contributor Author

@belugabehr thanks for suggestion. Implemented this.

try {
completionService.take().get();
} catch (InterruptedException| ExecutionException ignore) {
// What should we do here?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minimally log the error on info level?

count++;
}

for(int i=0; i<count; i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit of the nit: formatting:
for (int i = 0; i < count; i++) {


private ReplChangeManager replChangeManager;

@Override
public void init(AtomicBoolean stop) throws Exception {
super.init(stop);
replChangeManager = ReplChangeManager.getInstance(conf);
ThreadFactory threadFactory = new ThreadFactoryBuilder()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we move this to the run method too? It would make it easier to understand the code IMHO

Copy link
Contributor

@pvary pvary Jul 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe use this as an utility method to create the executor like and use this in the run method to create the executor:
WhateverUtil.createExecutor(String name, int size) {}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@adesh-rao
Copy link
Contributor Author

@pvary Can you please take one more look at this? There were few modifications after your last comment.

import java.util.concurrent.ThreadFactory;

public class CompactorUtil {
public interface ThrowingRunnable<E extends Exception> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Make it 2 space tabs.

@@ -3028,6 +3028,9 @@ private static void populateLlapDaemonVarsSet(Set<String> llapDaemonVarsSetLocal

HIVE_COMPACTOR_CLEANER_RUN_INTERVAL("hive.compactor.cleaner.run.interval", "5000ms",
new TimeValidator(TimeUnit.MILLISECONDS), "Time between runs of the cleaner thread"),
HIVE_COMPACTOR_CLEANER_REQUEST_QUEUE("hive.compactor.cleaner.request.queue", 1,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The config name can be relevant. It actually represent how many threads that we use for parallelly run the cleaner. But, the name sounds like Queue name. Can we change it to "HIVE_COMPACTOR_CLEANER_THREADS_NUM"?

Table t = newTable("default", "camipc", true);
List<Partition> partitions = new ArrayList<>();
Partition p = null;
for(int i=0; i<10; i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Make it "for (int i = 0; i < 10; i++)". Check other places in this patch.

addDeltaFile(t, p, 21L, 24L, 4);
partitions.add(p);
}
burnThroughTransactions("default", "camipc", 25);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Need a blank line after closing braces,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Assert.assertEquals(2, paths.size());
boolean sawBase = false, sawDelta = false;
for (Path path : paths) {
if (path.getName().equals("base_20")) sawBase = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Keep the code block of "if", "else if" and "else" in new line with a tab space.

@deniskuzZ deniskuzZ merged commit a206833 into apache:master Jul 31, 2020
abstractdog pushed a commit to abstractdog/hive that referenced this pull request Apr 9, 2021
…cess (Adesh Kumar Rao, reviewed by Peter Vary, Beluga Behr, Sankar Hariappan, Denys Kuzmenko)

Closes (apache#1275)

(cherry picked from commit a206833)
Change-Id: Iffb7d6bcfda0bfa64c7d907d0a5d01cb7b1f9c8e
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants