Permalink
Browse files

HIVE-1871 Bug in merging dynamic partitions introduced by HIVE-1806

(He Yongqiag via namit)



git-svn-id: https://svn.apache.org/repos/asf/hive/trunk@1054856 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information...
Namit Jain
Namit Jain committed Jan 4, 2011
1 parent ceafb22 commit a804a3d7f0726253c94af66cef260086307b4360
View
@@ -659,6 +659,9 @@ Trunk - Unreleased
HIVE-1874 fix HBase filter pushdown broken by HIVE-1638
(John Sichi via namit)
+ HIVE-1871 Bug in merging dynamic partitions introduced by HIVE-1806
+ (He Yongqiag via namit)
+
TESTS
HIVE-1464. improve test query performance
@@ -126,16 +126,17 @@ public int execute(DriverContext driverContext) {
// need to merge and they can simply be moved to the target directory.
LoadMultiFilesDesc lmfd = work.getLoadMultiFilesWork();
if (lmfd != null) {
- Path destPath = new Path(lmfd.getTargetDir());
- FileSystem fs = destPath.getFileSystem(conf);
- if (!fs.exists(destPath)) {
- fs.mkdirs(destPath);
- }
boolean isDfsDir = lmfd.getIsDfsDir();
- for (String s: lmfd.getSourceDirs()) {
- Path srcPath = new Path(s);
- Path dstPath = new Path(destPath, srcPath.getName());
- moveFile(srcPath, dstPath, isDfsDir);
+ int i = 0;
+ while (i <lmfd.getSourceDirs().size()) {
+ Path srcPath = new Path(lmfd.getSourceDirs().get(i));
+ Path destPath = new Path(lmfd.getTargetDirs().get(i));
+ FileSystem fs = destPath.getFileSystem(conf);
+ if (!fs.exists(destPath.getParent())) {
+ fs.mkdirs(destPath.getParent());
+ }
+ moveFile(srcPath, destPath, isDfsDir);
+ i++;
}
}
@@ -25,8 +25,6 @@
import java.util.List;
import java.util.Map;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -42,7 +40,6 @@
public class ConditionalResolverMergeFiles implements ConditionalResolver,
Serializable {
private static final long serialVersionUID = 1L;
- static final private Log LOG = LogFactory.getLog(ConditionalResolverMergeFiles.class.getName());
public ConditionalResolverMergeFiles() {
}
@@ -180,14 +177,35 @@ public void setDPCtx(DynamicPartitionCtx dp) {
// add the merge MR job
setupMapRedWork(conf, work, trgtSize, totalSz);
resTsks.add(mrTask);
-
+
// add the move task for those partitions that do not need merging
if (toMove.size() > 0) { //
// modify the existing move task as it is already in the candidate running tasks
MoveWork mvWork = (MoveWork) mvTask.getWork();
LoadFileDesc lfd = mvWork.getLoadFileWork();
+
+ String targetDir = lfd.getTargetDir();
+ List<String> targetDirs = new ArrayList<String>(toMove.size());
+ int numDPCols = dpCtx.getNumDPCols();
+
+ for (int i = 0; i < toMove.size(); i++) {
+ String toMoveStr = toMove.get(i);
+ if (toMoveStr.endsWith(Path.SEPARATOR)) {
+ toMoveStr = toMoveStr.substring(0, toMoveStr.length() - 1);
+ }
+ String [] moveStrSplits = toMoveStr.split(Path.SEPARATOR);
+ int dpIndex = moveStrSplits.length - numDPCols;
+ String target = targetDir;
+ while (dpIndex < moveStrSplits.length) {
+ target = target + Path.SEPARATOR + moveStrSplits[dpIndex];
+ dpIndex ++;
+ }
+
+ targetDirs.add(target);
+ }
+
LoadMultiFilesDesc lmfd = new LoadMultiFilesDesc(toMove,
- lfd.getTargetDir(), lfd.getIsDfsDir(), lfd.getColumns(), lfd.getColumnTypes());
+ targetDirs, lfd.getIsDfsDir(), lfd.getColumns(), lfd.getColumnTypes());
mvWork.setLoadFileWork(null);
mvWork.setLoadTableWork(null);
mvWork.setMultiFilesDesc(lmfd);
@@ -245,6 +263,7 @@ private long getMergeSize(FileSystem inpFs, Path dirPath, long avgSize) {
for (FileStatus fStat : fStats) {
totalSz += fStat.getLen();
}
+
if (totalSz < avgSize * fStats.length) {
return totalSz;
} else {
@@ -27,7 +27,7 @@
*/
public class LoadMultiFilesDesc implements Serializable {
private static final long serialVersionUID = 1L;
- private String targetDir;
+ private List<String> targetDirs;
private boolean isDfsDir;
// list of columns, comma separated
private String columns;
@@ -37,19 +37,19 @@
public LoadMultiFilesDesc() {
}
- public LoadMultiFilesDesc(final List<String> sourceDirs, final String targetDir,
+ public LoadMultiFilesDesc(final List<String> sourceDirs, final List<String> targetDir,
final boolean isDfsDir, final String columns, final String columnTypes) {
this.srcDirs = sourceDirs;
- this.targetDir = targetDir;
+ this.targetDirs = targetDir;
this.isDfsDir = isDfsDir;
this.columns = columns;
this.columnTypes = columnTypes;
}
- @Explain(displayName = "destination")
- public String getTargetDir() {
- return targetDir;
+ @Explain(displayName = "destinations")
+ public List<String> getTargetDirs() {
+ return targetDirs;
}
@Explain(displayName = "sources")
@@ -61,8 +61,8 @@ public void setSourceDirs(List<String> srcs) {
this.srcDirs = srcs;
}
- public void setTargetDir(final String targetDir) {
- this.targetDir = targetDir;
+ public void setTargetDirs(final List<String> targetDir) {
+ this.targetDirs = targetDir;
}
@Explain(displayName = "hdfs directory")
@@ -0,0 +1,38 @@
+set hive.exec.dynamic.partition=true;
+set hive.exec.dynamic.partition.mode=nonstrict;
+
+create table srcpart_merge_dp like srcpart;
+
+create table merge_dynamic_part like srcpart;
+
+load data local inpath '../data/files/srcbucket20.txt' INTO TABLE srcpart_merge_dp partition(ds='2008-04-08', hr=11);
+load data local inpath '../data/files/srcbucket21.txt' INTO TABLE srcpart_merge_dp partition(ds='2008-04-08', hr=11);
+load data local inpath '../data/files/srcbucket22.txt' INTO TABLE srcpart_merge_dp partition(ds='2008-04-08', hr=11);
+load data local inpath '../data/files/srcbucket23.txt' INTO TABLE srcpart_merge_dp partition(ds='2008-04-08', hr=11);
+
+load data local inpath '../data/files/srcbucket20.txt' INTO TABLE srcpart_merge_dp partition(ds='2008-04-08', hr=12);
+load data local inpath '../data/files/srcbucket21.txt' INTO TABLE srcpart_merge_dp partition(ds='2008-04-08', hr=12);
+load data local inpath '../data/files/srcbucket22.txt' INTO TABLE srcpart_merge_dp partition(ds='2008-04-08', hr=12);
+load data local inpath '../data/files/srcbucket23.txt' INTO TABLE srcpart_merge_dp partition(ds='2008-04-08', hr=12);
+
+load data local inpath '../data/files/kv1.txt' INTO TABLE srcpart_merge_dp partition(ds='2008-04-09', hr=11);
+load data local inpath '../data/files/kv2.txt' INTO TABLE srcpart_merge_dp partition(ds='2008-04-09', hr=11);
+load data local inpath '../data/files/kv1.txt' INTO TABLE srcpart_merge_dp partition(ds='2008-04-09', hr=12);
+load data local inpath '../data/files/kv2.txt' INTO TABLE srcpart_merge_dp partition(ds='2008-04-09', hr=12);
+
+show partitions srcpart_merge_dp;
+
+set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
+set hive.merge.mapfiles=true;
+set hive.merge.mapredfiles=true;
+set hive.merge.smallfiles.avgsize=3000;
+set hive.exec.compress.output=false;
+
+explain
+insert overwrite table merge_dynamic_part partition (ds, hr) select key, value, ds, hr from srcpart_merge_dp where ds>='2008-04-08';
+
+insert overwrite table merge_dynamic_part partition (ds, hr) select key, value, ds, hr from srcpart_merge_dp where ds>='2008-04-08';
+
+select ds, hr, count(1) from merge_dynamic_part where ds>='2008-04-08' group by ds, hr order by ds, hr;
+
+show table extended like `merge_dynamic_part`;
Oops, something went wrong.

0 comments on commit a804a3d

Please sign in to comment.