Skip to content

Commit

Permalink
HIVE-15546: Optimize Utilities.getInputPaths() so each listStatus of …
Browse files Browse the repository at this point in the history
…a partition is done in parallel (Sahil Takiar, reviewed by Sergio Pena)
  • Loading branch information
sahilTakiar authored and Sergio Pena committed Jan 24, 2017
1 parent 3f8656f commit 20210de
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 14 deletions.
94 changes: 80 additions & 14 deletions ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
Expand Up @@ -19,9 +19,11 @@
package org.apache.hadoop.hive.ql.exec;

import com.esotericsoftware.kryo.Kryo;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.WordUtils;
Expand Down Expand Up @@ -174,6 +176,7 @@
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -186,8 +189,11 @@
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down Expand Up @@ -2981,39 +2987,55 @@ public static List<Path> getInputPathsTez(JobConf job, MapWork work) throws Exce
public static List<Path> getInputPaths(JobConf job, MapWork work, Path hiveScratchDir,
Context ctx, boolean skipDummy) throws Exception {

int numThreads = job.getInt("mapred.dfsclient.parallelism.max", 0);
ExecutorService pool = null;
if (numThreads > 1) {
pool = Executors.newFixedThreadPool(numThreads,
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Get-Input-Paths-%d").build());
}
return getInputPaths(job, work, hiveScratchDir, ctx, skipDummy, pool);
}

@VisibleForTesting
static List<Path> getInputPaths(JobConf job, MapWork work, Path hiveScratchDir,
Context ctx, boolean skipDummy, ExecutorService pool) throws Exception {

Set<Path> pathsProcessed = new HashSet<Path>();
List<Path> pathsToAdd = new LinkedList<Path>();
// AliasToWork contains all the aliases
for (String alias : work.getAliasToWork().keySet()) {
LOG.info("Processing alias " + alias);

// The alias may not have any path
Path path = null;
boolean isEmptyTable = true;
boolean hasLogged = false;
// Note: this copies the list because createDummyFileForEmptyPartition may modify the map.
for (Path file : new LinkedList<Path>(work.getPathToAliases().keySet())) {
List<String> aliases = work.getPathToAliases().get(file);
if (aliases.contains(alias)) {
path = file;
if (file != null) {
isEmptyTable = false;
} else {
LOG.warn("Found a null path for alias " + alias);
continue;
}

// Multiple aliases can point to the same path - it should be
// processed only once
if (pathsProcessed.contains(path)) {
if (pathsProcessed.contains(file)) {
continue;
}

pathsProcessed.add(path);
pathsProcessed.add(file);

if (LOG.isDebugEnabled()) {
LOG.debug("Adding input file " + path);
LOG.debug("Adding input file " + file);
} else if (!hasLogged) {
hasLogged = true;
LOG.info("Adding " + work.getPathToAliases().size()
+ " inputs; the first input is " + path);
}
if (!skipDummy && isEmptyPath(job, path, ctx)) {
path = createDummyFileForEmptyPartition(path, job, work, hiveScratchDir);
+ " inputs; the first input is " + file);
}
pathsToAdd.add(path);
pathsToAdd.add(file);
}
}

Expand All @@ -3025,12 +3047,56 @@ public static List<Path> getInputPaths(JobConf job, MapWork work, Path hiveScrat
// T2) x;
// If T is empty and T2 contains 100 rows, the user expects: 0, 100 (2
// rows)
if (path == null && !skipDummy) {
path = createDummyFileForEmptyTable(job, work, hiveScratchDir, alias);
pathsToAdd.add(path);
if (isEmptyTable && !skipDummy) {
pathsToAdd.add(createDummyFileForEmptyTable(job, work, hiveScratchDir, alias));
}
}

List<Path> finalPathsToAdd = new LinkedList<>();
List<Future<Path>> futures = new LinkedList<>();
for (final Path path : pathsToAdd) {
if (pool == null) {
finalPathsToAdd.add(new GetInputPathsCallable(path, job, work, hiveScratchDir, ctx, skipDummy).call());
} else {
futures.add(pool.submit(new GetInputPathsCallable(path, job, work, hiveScratchDir, ctx, skipDummy)));
}
}

if (pool != null) {
for (Future<Path> future : futures) {
finalPathsToAdd.add(future.get());
}
}

return finalPathsToAdd;
}

private static class GetInputPathsCallable implements Callable<Path> {

private final Path path;
private final JobConf job;
private final MapWork work;
private final Path hiveScratchDir;
private final Context ctx;
private final boolean skipDummy;

private GetInputPathsCallable(Path path, JobConf job, MapWork work, Path hiveScratchDir,
Context ctx, boolean skipDummy) {
this.path = path;
this.job = job;
this.work = work;
this.hiveScratchDir = hiveScratchDir;
this.ctx = ctx;
this.skipDummy = skipDummy;
}

@Override
public Path call() throws Exception {
if (!this.skipDummy && isEmptyPath(this.job, this.path, this.ctx)) {
return createDummyFileForEmptyPartition(this.path, this.job, this.work, this.hiveScratchDir);
}
return this.path;
}
return pathsToAdd;
}

@SuppressWarnings({"rawtypes", "unchecked"})
Expand Down
64 changes: 64 additions & 0 deletions ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java
Expand Up @@ -33,6 +33,8 @@
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -289,4 +291,66 @@ public void testGetInputPathsWithEmptyTables() throws Exception {
}
}
}

/**
* Test for {@link Utilities#getInputPaths(JobConf, MapWork, Path, Context, boolean, ExecutorService)} with a single
* threaded {@link ExecutorService}.
*/
@Test
public void testGetInputPathsWithPool() throws Exception {
ExecutorService pool = Executors.newSingleThreadExecutor();

JobConf jobConf = new JobConf();
MapWork mapWork = new MapWork();
Path scratchDir = new Path(HiveConf.getVar(jobConf, HiveConf.ConfVars.LOCALSCRATCHDIR));

String testTableName = "testTable";
String testPartitionName = "testPartition";

Path testTablePath = new Path(testTableName);
Path testPartitionPath = new Path(testTablePath, testPartitionName);
Path testFileTablePath = new Path(testTablePath, "test.txt");
Path testFilePartitionPath = new Path(testPartitionPath, "test.txt");

TableDesc mockTableDesc = mock(TableDesc.class);

when(mockTableDesc.isNonNative()).thenReturn(false);
when(mockTableDesc.getProperties()).thenReturn(new Properties());

LinkedHashMap<Path, ArrayList<String>> pathToAliasTable = new LinkedHashMap<>();
pathToAliasTable.put(testTablePath, Lists.newArrayList(testTableName));
mapWork.setPathToAliases(pathToAliasTable);

mapWork.getAliasToWork().put(testTableName, (Operator<?>) mock(Operator.class));

FileSystem fs = FileSystem.getLocal(jobConf);
try {
fs.mkdirs(testTablePath);
fs.create(testFileTablePath).close();

// Run a test with an un-partitioned table with a single file as the input
List<Path> tableInputPaths = Utilities.getInputPaths(jobConf, mapWork, scratchDir, mock(Context.class), false,
pool);
assertEquals(tableInputPaths.size(), 1);
assertEquals(tableInputPaths.get(0), testTablePath);

LinkedHashMap<Path, ArrayList<String>> pathToAliasPartition = new LinkedHashMap<>();
pathToAliasPartition.put(testPartitionPath, Lists.newArrayList(testTableName));
mapWork.setPathToAliases(pathToAliasPartition);

fs.delete(testFileTablePath, false);
fs.mkdirs(testPartitionPath);
fs.create(testFilePartitionPath).close();

// Run a test with a partitioned table with a single partition and a single file as the input
List<Path> tablePartitionInputPaths = Utilities.getInputPaths(jobConf, mapWork, scratchDir, mock(Context.class),
false, pool);
assertEquals(tablePartitionInputPaths.size(), 1);
assertEquals(tablePartitionInputPaths.get(0), testPartitionPath);
} finally {
if (fs.exists(testTablePath)) {
fs.delete(testTablePath, true);
}
}
}
}

0 comments on commit 20210de

Please sign in to comment.