From b3188c8afb71bda0bc1d846de1a558560adcfa95 Mon Sep 17 00:00:00 2001 From: Anishek Agarwal Date: Thu, 10 May 2018 16:07:26 +0530 Subject: [PATCH 1/2] HIVE-19485 : dump directory for non native tables should not be created --- ...stReplicationScenariosAcrossInstances.java | 97 ++++++++++++------- .../hadoop/hive/ql/exec/ExportTask.java | 2 +- .../hive/ql/parse/repl/dump/TableExport.java | 53 ++++++---- 3 files changed, 93 insertions(+), 59 deletions(-) diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index df9bde059e8d..d704b871d781 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -21,6 +21,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hive.conf.HiveConf; @@ -52,6 +53,7 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertFalse; public class TestReplicationScenariosAcrossInstances { @Rule @@ -683,54 +685,75 @@ public void testIncrementalReplWithDropAndCreateTableDifferentPartitionTypeAndIn // Bootstrap load in replica replica.load(replicatedDbName, bootstrapTuple.dumpLocation) - .status(replicatedDbName) - .verifyResult(bootstrapTuple.lastReplicationId); + .status(replicatedDbName) + .verifyResult(bootstrapTuple.lastReplicationId); // First incremental dump WarehouseInstance.Tuple firstIncremental = primary.run("use " + primaryDbName) - .run("create table table1 (id int) partitioned by (country string)") - .run("create table table2 (id int)") - .run("create table table3 (id int) partitioned by (country string)") - .run("insert into table1 partition(country='india') values(1)") - .run("insert into table2 values(2)") - .run("insert into table3 partition(country='india') values(3)") - .dump(primaryDbName, bootstrapTuple.lastReplicationId); + .run("create table table1 (id int) partitioned by (country string)") + .run("create table table2 (id int)") + .run("create table table3 (id int) partitioned by (country string)") + .run("insert into table1 partition(country='india') values(1)") + .run("insert into table2 values(2)") + .run("insert into table3 partition(country='india') values(3)") + .dump(primaryDbName, bootstrapTuple.lastReplicationId); // Second incremental dump WarehouseInstance.Tuple secondIncremental = primary.run("use " + primaryDbName) - .run("drop table table1") - .run("drop table table2") - .run("drop table table3") - .run("create table table1 (id int)") - .run("insert into table1 values (10)") - .run("create table table2 (id int) partitioned by (country string)") - .run("insert into table2 partition(country='india') values(20)") - .run("create table table3 (id int) partitioned by (name string, rank int)") - .run("insert into table3 partition(name='adam', rank=100) values(30)") - .dump(primaryDbName, firstIncremental.lastReplicationId); + .run("drop table table1") + .run("drop table table2") + .run("drop table table3") + .run("create table table1 (id int)") + .run("insert into table1 values (10)") + .run("create table table2 (id int) partitioned by (country string)") + .run("insert into table2 partition(country='india') values(20)") + .run("create table table3 (id int) partitioned by (name string, rank int)") + .run("insert into table3 partition(name='adam', rank=100) values(30)") + .dump(primaryDbName, firstIncremental.lastReplicationId); // First incremental load replica.load(replicatedDbName, firstIncremental.dumpLocation) - .status(replicatedDbName) - .verifyResult(firstIncremental.lastReplicationId) - .run("use " + replicatedDbName) - .run("select id from table1") - .verifyResults(new String[] {"1"}) - .run("select * from table2") - .verifyResults(new String[] {"2"}) - .run("select id from table3") - .verifyResults(new String[] {"3"}); + .status(replicatedDbName) + .verifyResult(firstIncremental.lastReplicationId) + .run("use " + replicatedDbName) + .run("select id from table1") + .verifyResults(new String[] { "1" }) + .run("select * from table2") + .verifyResults(new String[] { "2" }) + .run("select id from table3") + .verifyResults(new String[] { "3" }); // Second incremental load replica.load(replicatedDbName, secondIncremental.dumpLocation) - .status(replicatedDbName) - .verifyResult(secondIncremental.lastReplicationId) - .run("use " + replicatedDbName) - .run("select * from table1") - .verifyResults(new String[] {"10"}) - .run("select id from table2") - .verifyResults(new String[] {"20"}) - .run("select id from table3") - .verifyResults(new String[] {"30"}); + .status(replicatedDbName) + .verifyResult(secondIncremental.lastReplicationId) + .run("use " + replicatedDbName) + .run("select * from table1") + .verifyResults(new String[] { "10" }) + .run("select id from table2") + .verifyResults(new String[] { "20" }) + .run("select id from table3") + .verifyResults(new String[] {"30"}); + } + + @Test + public void shouldNotCreateDirectoryForNonNativeTableInDumpDirectory() throws Throwable { + String createTableQuery = + "CREATE TABLE custom_serdes( serde_id bigint COMMENT 'from deserializer', name string " + + "COMMENT 'from deserializer', slib string COMMENT 'from deserializer') " + + "ROW FORMAT SERDE 'org.apache.hive.storage.jdbc.JdbcSerDe' " + + "STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler' " + + "WITH SERDEPROPERTIES ('serialization.format'='1') " + + "TBLPROPERTIES ( " + + "'hive.sql.database.type'='METASTORE', " + + "'hive.sql.query'='SELECT \"SERDE_ID\", \"NAME\", \"SLIB\" FROM \"SERDES\"')"; + + WarehouseInstance.Tuple bootstrapTuple = primary + .run("use " + primaryDbName) + .run(createTableQuery).dump(primaryDbName, null); + Path cSerdesTableDumpLocation = new Path(bootstrapTuple.dumpLocation + + Path.SEPARATOR + primaryDbName + Path.SEPARATOR + "custom_serdes"); + FileSystem fs = cSerdesTableDumpLocation.getFileSystem(primary.hiveConf); + assertFalse(fs.exists(cSerdesTableDumpLocation)); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java index 119d7923a5b4..3c6a606b0148 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java @@ -50,7 +50,7 @@ protected int execute(DriverContext driverContext) { TableExport.Paths exportPaths = new TableExport.Paths( work.getAstRepresentationForErrorMsg(), work.getExportRootDir(), conf, false); Hive db = getHive(); - LOG.debug("Exporting data to: {}", exportPaths.getExportRootDir()); + LOG.debug("Exporting data to: {}", exportPaths.exportRootDir()); work.acidPostProcess(db); TableExport tableExport = new TableExport(exportPaths, work.getTableSpec(), work.getReplicationSpec(), db, null, conf, work.getMmContext()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java index 2a3986a317ca..20ff23a46b2c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java @@ -177,9 +177,10 @@ private boolean shouldExport() { public static class Paths { private final String astRepresentationForErrorMsg; private final HiveConf conf; - private final Path exportRootDir; + //variable access should not be done and use exportRootDir() instead. + private final Path _exportRootDir; private final FileSystem exportFileSystem; - private boolean writeData; + private boolean writeData, exportRootDirCreated = false; public Paths(String astRepresentationForErrorMsg, Path dbRoot, String tblName, HiveConf conf, boolean shouldWriteData) throws SemanticException { @@ -189,12 +190,9 @@ public Paths(String astRepresentationForErrorMsg, Path dbRoot, String tblName, H Path tableRoot = new Path(dbRoot, tblName); URI exportRootDir = EximUtil.getValidatedURI(conf, tableRoot.toUri().toString()); validateTargetDir(exportRootDir); - this.exportRootDir = new Path(exportRootDir); + this._exportRootDir = new Path(exportRootDir); try { - this.exportFileSystem = this.exportRootDir.getFileSystem(conf); - if (!exportFileSystem.exists(this.exportRootDir) && writeData) { - exportFileSystem.mkdirs(this.exportRootDir); - } + this.exportFileSystem = this._exportRootDir.getFileSystem(conf); } catch (IOException e) { throw new SemanticException(e); } @@ -204,20 +202,37 @@ public Paths(String astRepresentationForErrorMsg, String path, HiveConf conf, boolean shouldWriteData) throws SemanticException { this.astRepresentationForErrorMsg = astRepresentationForErrorMsg; this.conf = conf; - this.exportRootDir = new Path(EximUtil.getValidatedURI(conf, path)); + this._exportRootDir = new Path(EximUtil.getValidatedURI(conf, path)); this.writeData = shouldWriteData; try { - this.exportFileSystem = exportRootDir.getFileSystem(conf); - if (!exportFileSystem.exists(this.exportRootDir) && writeData) { - exportFileSystem.mkdirs(this.exportRootDir); - } + this.exportFileSystem = _exportRootDir.getFileSystem(conf); } catch (IOException e) { throw new SemanticException(e); } } Path partitionExportDir(String partitionName) throws SemanticException { - return exportDir(new Path(exportRootDir, partitionName)); + return exportDir(new Path(exportRootDir(), partitionName)); + } + + /** + * Access to the {@link #_exportRootDir} should only be done via this method + * since the creation of the directory is delayed until we figure out if we want + * to write something or not. This is specifically important to prevent empty non-native + * directories being created in repl dump. + */ + public Path exportRootDir() throws SemanticException { + if (!exportRootDirCreated) { + try { + if (!exportFileSystem.exists(this._exportRootDir) && writeData) { + exportFileSystem.mkdirs(this._exportRootDir); + } + exportRootDirCreated = true; + } catch (IOException e) { + throw new SemanticException(e); + } + } + return _exportRootDir; } private Path exportDir(Path exportDir) throws SemanticException { @@ -232,8 +247,8 @@ private Path exportDir(Path exportDir) throws SemanticException { } } - private Path metaDataExportFile() { - return new Path(exportRootDir, EximUtil.METADATA_NAME); + private Path metaDataExportFile() throws SemanticException { + return new Path(exportRootDir(), EximUtil.METADATA_NAME); } /** @@ -241,7 +256,7 @@ private Path metaDataExportFile() { * Partition's data export directory is created within the export semantics of partition. */ private Path dataExportDir() throws SemanticException { - return exportDir(new Path(getExportRootDir(), EximUtil.DATA_PATH_NAME)); + return exportDir(new Path(exportRootDir(), EximUtil.DATA_PATH_NAME)); } /** @@ -274,10 +289,6 @@ private void validateTargetDir(URI rootDirExportFile) throws SemanticException { throw new SemanticException(astRepresentationForErrorMsg, e); } } - - public Path getExportRootDir() { - return exportRootDir; - } } public static class AuthEntities { @@ -311,7 +322,7 @@ public AuthEntities getAuthEntities() throws SemanticException { authEntities.inputs.add(new ReadEntity(tableSpec.tableHandle)); } } - authEntities.outputs.add(toWriteEntity(paths.getExportRootDir(), conf)); + authEntities.outputs.add(toWriteEntity(paths.exportRootDir(), conf)); } catch (Exception e) { throw new SemanticException(e); } From 8770ceb16e957dd68200c0d710bcd3080da91efb Mon Sep 17 00:00:00 2001 From: Anishek Agarwal Date: Thu, 17 May 2018 10:49:23 +0530 Subject: [PATCH 2/2] HIVE-19485 : addressing comments in Test --- .../ql/parse/TestReplicationScenariosAcrossInstances.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index d704b871d781..8caa55c5d518 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -751,8 +751,9 @@ public void shouldNotCreateDirectoryForNonNativeTableInDumpDirectory() throws Th WarehouseInstance.Tuple bootstrapTuple = primary .run("use " + primaryDbName) .run(createTableQuery).dump(primaryDbName, null); - Path cSerdesTableDumpLocation = new Path(bootstrapTuple.dumpLocation - + Path.SEPARATOR + primaryDbName + Path.SEPARATOR + "custom_serdes"); + Path cSerdesTableDumpLocation = new Path( + new Path(bootstrapTuple.dumpLocation, primaryDbName), + "custom_serdes"); FileSystem fs = cSerdesTableDumpLocation.getFileSystem(primary.hiveConf); assertFalse(fs.exists(cSerdesTableDumpLocation)); }