From 44ab859e7d1041915fd910eda6fced270938fa56 Mon Sep 17 00:00:00 2001 From: Daniel Dai Date: Mon, 28 Aug 2017 10:19:29 -0700 Subject: [PATCH 1/5] HIVE-17366: Constraint replication in bootstrap --- .../ql/parse/TestReplicationScenarios.java | 9 +- .../hive/ql/exec/repl/ReplDumpTask.java | 23 ++++ .../ql/exec/repl/bootstrap/ReplLoadTask.java | 30 ++++- .../ql/exec/repl/bootstrap/ReplLoadWork.java | 7 ++ .../repl/bootstrap/events/BootstrapEvent.java | 2 +- .../bootstrap/events/ConstraintEvent.java | 24 ++++ .../filesystem/ConstraintEventsIterator.java | 87 +++++++++++++ .../events/filesystem/FSConstraintEvent.java | 39 ++++++ .../repl/bootstrap/load/LoadConstraint.java | 119 ++++++++++++++++++ .../apache/hadoop/hive/ql/metadata/Hive.java | 32 +++++ .../apache/hadoop/hive/ql/parse/EximUtil.java | 2 +- .../ql/parse/ReplicationSemanticAnalyzer.java | 1 + .../repl/dump/io/ConstraintsSerializer.java | 71 +++++++++++ .../load/message/AddForeignKeyHandler.java | 6 +- .../message/AddNotNullConstraintHandler.java | 7 +- .../load/message/AddPrimaryKeyHandler.java | 7 +- .../message/AddUniqueConstraintHandler.java | 6 +- 17 files changed, 456 insertions(+), 16 deletions(-) create mode 100644 ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/ConstraintEvent.java create mode 100644 ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java create mode 100644 ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSConstraintEvent.java create mode 100644 ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java create mode 100644 ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/ConstraintsSerializer.java diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index b19c1aa1cb9c..17dabd9b2cae 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -2700,16 +2700,15 @@ public void testConstraints() throws IOException { LOG.info("Dumped to {} with id {}", replDumpLocn, replDumpId); run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); - // bootstrap replication for constraint is not implemented. Will verify it works once done try { List pks = metaStoreClientMirror.getPrimaryKeys(new PrimaryKeysRequest(dbName+ "_dupe" , "tbl1")); - assertTrue(pks.isEmpty()); + assertEquals(pks.size(), 1); List uks = metaStoreClientMirror.getUniqueConstraints(new UniqueConstraintsRequest(dbName+ "_dupe" , "tbl1")); - assertTrue(uks.isEmpty()); + assertEquals(uks.size(), 1); List fks = metaStoreClientMirror.getForeignKeys(new ForeignKeysRequest(null, null, dbName+ "_dupe" , "tbl2")); - assertTrue(fks.isEmpty()); + assertEquals(fks.size(), 1); List nns = metaStoreClientMirror.getNotNullConstraints(new NotNullConstraintsRequest(dbName+ "_dupe" , "tbl3")); - assertTrue(nns.isEmpty()); + assertEquals(nns.size(), 1); } catch (TException te) { assertNull(te); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 67a67fd8e399..8531dcd2a166 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -25,6 +25,10 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.Function; import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.api.SQLForeignKey; +import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint; +import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; +import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint; import org.apache.hadoop.hive.metastore.messaging.EventUtils; import org.apache.hadoop.hive.metastore.messaging.MessageFactory; import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter; @@ -46,6 +50,7 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.dump.events.EventHandler; import org.apache.hadoop.hive.ql.parse.repl.dump.events.EventHandlerFactory; +import org.apache.hadoop.hive.ql.parse.repl.dump.io.ConstraintsSerializer; import org.apache.hadoop.hive.ql.parse.repl.dump.io.FunctionSerializer; import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; @@ -61,6 +66,7 @@ Licensed to the Apache Software Foundation (ASF) under one public class ReplDumpTask extends Task implements Serializable { private static final String dumpSchema = "dump_dir,last_repl_id#string,string"; private static final String FUNCTIONS_ROOT_DIR_NAME = "_functions"; + private static final String CONSTRAINTS_ROOT_DIR_NAME = "_constraints"; private static final String FUNCTION_METADATA_FILE_NAME = "_metadata"; private Logger LOG = LoggerFactory.getLogger(ReplDumpTask.class); @@ -186,6 +192,7 @@ private Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws LOG.debug( "analyzeReplDump dumping table: " + tblName + " to db root " + dbRoot.toUri()); dumpTable(dbName, tblName, dbRoot); + dumpConstraintMetadata(dbName, tblName, dumpRoot); } } Long bootDumpEndReplId = getHive().getMSC().getCurrentNotificationEventId().getEventId(); @@ -299,6 +306,22 @@ private void dumpFunctionMetadata(String dbName, Path dumpRoot) throws Exception } } + private void dumpConstraintMetadata(String dbName, String tblName, Path dumpRoot) throws Exception { + Path constraintsRoot = new Path(new Path(dumpRoot, dbName), CONSTRAINTS_ROOT_DIR_NAME); + Path constraintsFile = new Path(constraintsRoot, tblName); + List pks = getHive().getPrimaryKeyList(dbName, tblName); + List fks = getHive().getForeignKeyList(dbName, tblName); + List uks = getHive().getUniqueConstraintList(dbName, tblName); + List nns = getHive().getNotNullConstraintList(dbName, tblName); + if (!pks.isEmpty() || !fks.isEmpty() || !uks.isEmpty() || !nns.isEmpty()) { + try (JsonWriter jsonWriter = + new JsonWriter(constraintsFile.getFileSystem(conf), constraintsFile)) { + ConstraintsSerializer serializer = new ConstraintsSerializer(pks, fks, uks, nns, conf); + serializer.writeTo(jsonWriter, null); + } + } + } + private HiveWrapper.Tuple functionTuple(String functionName, String dbName) { try { HiveWrapper.Tuple tuple = new HiveWrapper(getHive(), dbName).function(functionName); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java index 6ea1754710b8..44b32f25fd54 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java @@ -22,11 +22,14 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.ConstraintEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.FunctionEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.PartitionEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.ConstraintEventsIterator; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadConstraint; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadFunction; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker; @@ -72,6 +75,7 @@ protected int execute(DriverContext driverContext) { a database ( directory ) */ BootstrapEventsIterator iterator = work.iterator(); + ConstraintEventsIterator constraintIterator = work.constraintIterator(); /* This is used to get hold of a reference during the current creation of tasks and is initialized with "0" tasks such that it will be non consequential in any operations done with task tracker @@ -80,8 +84,17 @@ a database ( directory ) TaskTracker dbTracker = new TaskTracker(ZERO_TASKS); TaskTracker tableTracker = new TaskTracker(ZERO_TASKS); Scope scope = new Scope(); - while (iterator.hasNext() && loadTaskTracker.canAddMoreTasks()) { - BootstrapEvent next = iterator.next(); + boolean loadingConstraint = false; + if (!iterator.hasNext() && constraintIterator.hasNext()) { + loadingConstraint = true; + } + while ((iterator.hasNext() || loadingConstraint && constraintIterator.hasNext()) && loadTaskTracker.canAddMoreTasks()) { + BootstrapEvent next; + if (!loadingConstraint) { + next = iterator.next(); + } else { + next = constraintIterator.next(); + } switch (next.eventType()) { case Database: DatabaseEvent dbEvent = (DatabaseEvent) next; @@ -168,11 +181,20 @@ a database ( directory ) functionsTracker.debugLog("functions"); break; } + case Constraint: { + LoadConstraint loadConstraint = + new LoadConstraint(context, (ConstraintEvent) next, work.dbNameToLoadIn, dbTracker); + TaskTracker constraintTracker = loadConstraint.tasks(); + scope.rootTasks.addAll(constraintTracker.tasks()); + loadTaskTracker.update(constraintTracker); + constraintTracker.debugLog("constraints"); + } } } - boolean addAnotherLoadTask = iterator.hasNext() || loadTaskTracker.hasReplicationState(); + boolean addAnotherLoadTask = iterator.hasNext() || loadTaskTracker.hasReplicationState() + || constraintIterator.hasNext(); createBuilderTask(scope.rootTasks, addAnotherLoadTask); - if (!iterator.hasNext()) { + if (!iterator.hasNext() && !constraintIterator.hasNext()) { loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, context, scope)); } this.childTasks = scope.rootTasks; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java index eb18e5f8a828..aceb73c00b76 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java @@ -20,6 +20,7 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.ConstraintEventsIterator; import org.apache.hadoop.hive.ql.plan.Explain; import java.io.IOException; @@ -32,6 +33,7 @@ public class ReplLoadWork implements Serializable { final String dbNameToLoadIn; final String tableNameToLoadIn; private final BootstrapEventsIterator iterator; + private final ConstraintEventsIterator constraintsIterator; private int loadTaskRunCount = 0; private DatabaseEvent.State state = null; @@ -39,6 +41,7 @@ public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameToLoad String tableNameToLoadIn) throws IOException { this.tableNameToLoadIn = tableNameToLoadIn; this.iterator = new BootstrapEventsIterator(dumpDirectory, hiveConf); + this.constraintsIterator = new ConstraintEventsIterator(dumpDirectory, hiveConf); this.dbNameToLoadIn = dbNameToLoadIn; } @@ -51,6 +54,10 @@ public BootstrapEventsIterator iterator() { return iterator; } + public ConstraintEventsIterator constraintIterator() { + return constraintsIterator; + } + int executedLoadTask() { return ++loadTaskRunCount; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/BootstrapEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/BootstrapEvent.java index db2b0ace9566..7b7aac963fcb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/BootstrapEvent.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/BootstrapEvent.java @@ -22,7 +22,7 @@ public interface BootstrapEvent { EventType eventType(); enum EventType { - Database, Table, Function, Partition + Database, Table, Function, Partition, Constraint } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/ConstraintEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/ConstraintEvent.java new file mode 100644 index 000000000000..742928393506 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/ConstraintEvent.java @@ -0,0 +1,24 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events; + +import org.apache.hadoop.fs.Path; + +public interface ConstraintEvent extends BootstrapEvent { + Path rootDir(); +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java new file mode 100644 index 000000000000..bacf158d2da1 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java @@ -0,0 +1,87 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer; + +public class ConstraintEventsIterator implements Iterator { + private FileStatus[] dbDirs; + private int currentDbIndex; + private FileStatus[] constraintFiles = null; + private int currentConstraintIndex; + private FileSystem fs; + + public ConstraintEventsIterator(String dumpDirectory, HiveConf hiveConf) throws IOException { + Path path = new Path(dumpDirectory); + fs = path.getFileSystem(hiveConf); + dbDirs = fs.listStatus(new Path(dumpDirectory), EximUtil.getDirectoryFilter(fs)); + currentDbIndex = 0; + if (dbDirs.length != 0) { + currentConstraintIndex = 0; + constraintFiles = listConstraintFilesInDBDir(fs, dbDirs[0].getPath()); + } + } + + private FileStatus[] listConstraintFilesInDBDir(FileSystem fs, Path dbDir) { + try { + return fs.listStatus(new Path(dbDirs[0].getPath(), ReplicationSemanticAnalyzer.CONSTRAINTS_ROOT_DIR_NAME)); + } catch (FileNotFoundException e) { + return new FileStatus[]{}; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public boolean hasNext() { + if (constraintFiles != null && currentConstraintIndex < constraintFiles.length) { + return true; + } + while (constraintFiles != null && currentConstraintIndex == constraintFiles.length) { + currentDbIndex ++; + if (currentDbIndex < dbDirs.length) { + currentConstraintIndex = 0; + constraintFiles = listConstraintFilesInDBDir(fs, dbDirs[0].getPath()); + } else { + constraintFiles = null; + } + } + if (constraintFiles != null) { + return true; + } else { + return false; + } + } + + @Override + public FSConstraintEvent next() { + int thisIndex = currentConstraintIndex; + currentConstraintIndex++; + return new FSConstraintEvent(constraintFiles[thisIndex].getPath()); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSConstraintEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSConstraintEvent.java new file mode 100644 index 000000000000..a2ad44421dfd --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSConstraintEvent.java @@ -0,0 +1,39 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.ConstraintEvent; + +public class FSConstraintEvent implements ConstraintEvent { + private final Path rootDir; + + FSConstraintEvent(Path rootDir) { + this.rootDir = rootDir; + } + + @Override + public Path rootDir() { + return rootDir; + } + + @Override + public EventType eventType() { + return EventType.Constraint; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java new file mode 100644 index 000000000000..fc2aa8d598cb --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java @@ -0,0 +1,119 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.ConstraintEvent; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; +import org.apache.hadoop.hive.ql.parse.repl.load.message.AddForeignKeyHandler; +import org.apache.hadoop.hive.ql.parse.repl.load.message.AddNotNullConstraintHandler; +import org.apache.hadoop.hive.ql.parse.repl.load.message.AddPrimaryKeyHandler; +import org.apache.hadoop.hive.ql.parse.repl.load.message.AddUniqueConstraintHandler; +import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler; +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.stripQuotes; + +public class LoadConstraint { + private static final Logger LOG = LoggerFactory.getLogger(LoadFunction.class); + private Context context; + private final ConstraintEvent event; + private final String dbNameToLoadIn; + private final TaskTracker tracker; + + public LoadConstraint(Context context, ConstraintEvent event, String dbNameToLoadIn, + TaskTracker existingTracker) { + this.context = context; + this.event = event; + this.dbNameToLoadIn = dbNameToLoadIn; + this.tracker = new TaskTracker(existingTracker); + } + + public TaskTracker tasks() throws IOException, SemanticException { + URI fromURI = EximUtil + .getValidatedURI(context.hiveConf, stripQuotes(event.rootDir().toUri().toString())); + Path fromPath = new Path(fromURI.getScheme(), fromURI.getAuthority(), fromURI.getPath()); + + try { + FileSystem fs = FileSystem.get(fromPath.toUri(), context.hiveConf); + JSONObject json = new JSONObject(EximUtil.readAsString(fs, fromPath)); + String pksString = json.getString("pks"); + String fksString = json.getString("fks"); + String uksString = json.getString("uks"); + String nnsString = json.getString("nns"); + List> tasks = new ArrayList>(); + + AddPrimaryKeyHandler pkHandler = new AddPrimaryKeyHandler(); + DumpMetaData pkDumpMetaData = new DumpMetaData(fromPath, DumpType.EVENT_ADD_PRIMARYKEY, Long.MAX_VALUE, Long.MAX_VALUE, null, + context.hiveConf); + pkDumpMetaData.setPayload(pksString); + tasks.addAll(pkHandler.handle( + new MessageHandler.Context( + dbNameToLoadIn, null, fromPath.toString(), null, pkDumpMetaData, context.hiveConf, + context.hiveDb, null, LOG))); + + AddForeignKeyHandler fkHandler = new AddForeignKeyHandler(); + DumpMetaData fkDumpMetaData = new DumpMetaData(fromPath, DumpType.EVENT_ADD_FOREIGNKEY, Long.MAX_VALUE, Long.MAX_VALUE, null, + context.hiveConf); + fkDumpMetaData.setPayload(fksString); + tasks.addAll(fkHandler.handle( + new MessageHandler.Context( + dbNameToLoadIn, null, fromPath.toString(), null, fkDumpMetaData, context.hiveConf, + context.hiveDb, null, LOG))); + + AddUniqueConstraintHandler ukHandler = new AddUniqueConstraintHandler(); + DumpMetaData ukDumpMetaData = new DumpMetaData(fromPath, DumpType.EVENT_ADD_UNIQUECONSTRAINT, Long.MAX_VALUE, Long.MAX_VALUE, null, + context.hiveConf); + ukDumpMetaData.setPayload(uksString); + tasks.addAll(ukHandler.handle( + new MessageHandler.Context( + dbNameToLoadIn, null, fromPath.toString(), null, ukDumpMetaData, context.hiveConf, + context.hiveDb, null, LOG))); + + AddNotNullConstraintHandler nnHandler = new AddNotNullConstraintHandler(); + DumpMetaData nnDumpMetaData = new DumpMetaData(fromPath, DumpType.EVENT_ADD_NOTNULLCONSTRAINT, Long.MAX_VALUE, Long.MAX_VALUE, null, + context.hiveConf); + nnDumpMetaData.setPayload(nnsString); + tasks.addAll(nnHandler.handle( + new MessageHandler.Context( + dbNameToLoadIn, null, fromPath.toString(), null, nnDumpMetaData, context.hiveConf, + context.hiveDb, null, LOG))); + + tasks.forEach(tracker::addTask); + return tracker; + } catch (Exception e) { + throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e); + } + } + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index d661f10c4077..f4b95a82580f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -4208,6 +4208,38 @@ public void dropConstraint(String dbName, String tableName, String constraintNam } } + public List getPrimaryKeyList(String dbName, String tblName) throws HiveException { + try { + return getMSC().getPrimaryKeys(new PrimaryKeysRequest(dbName, tblName)); + } catch (Exception e) { + throw new HiveException(e); + } + } + + public List getForeignKeyList(String dbName, String tblName) throws HiveException { + try { + return getMSC().getForeignKeys(new ForeignKeysRequest(null, null, dbName, tblName)); + } catch (Exception e) { + throw new HiveException(e); + } + } + + public List getUniqueConstraintList(String dbName, String tblName) throws HiveException { + try { + return getMSC().getUniqueConstraints(new UniqueConstraintsRequest(dbName, tblName)); + } catch (Exception e) { + throw new HiveException(e); + } + } + + public List getNotNullConstraintList(String dbName, String tblName) throws HiveException { + try { + return getMSC().getNotNullConstraints(new NotNullConstraintsRequest(dbName, tblName)); + } catch (Exception e) { + throw new HiveException(e); + } + } + /** * Get all primary key columns associated with the table. * diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java index 22094c05632f..373321e48b58 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java @@ -272,7 +272,7 @@ public static MetaData readMetaData(FileSystem fs, Path metadataPath) } } - private static String readAsString(final FileSystem fs, final Path fromMetadataPath) + public static String readAsString(final FileSystem fs, final Path fromMetadataPath) throws IOException { try (FSDataInputStream stream = fs.open(fromMetadataPath)) { byte[] buffer = new byte[1024]; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 3e2c51304783..85aa262259f2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -76,6 +76,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { private static final String dumpSchema = "dump_dir,last_repl_id#string,string"; public static final String FUNCTIONS_ROOT_DIR_NAME = "_functions"; + public static final String CONSTRAINTS_ROOT_DIR_NAME = "_constraints"; private final static Logger REPL_STATE_LOG = LoggerFactory.getLogger("ReplState"); ReplicationSemanticAnalyzer(QueryState queryState) throws SemanticException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/ConstraintsSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/ConstraintsSerializer.java new file mode 100644 index 000000000000..22de079de5f1 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/ConstraintsSerializer.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.parse.repl.dump.io; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.SQLForeignKey; +import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint; +import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; +import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint; +import org.apache.hadoop.hive.metastore.messaging.MessageFactory; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; +import org.apache.hadoop.hive.ql.parse.SemanticException; + +public class ConstraintsSerializer implements JsonWriter.Serializer { + private HiveConf hiveConf; + private List pks; + private List fks; + private List uks; + private List nns; + + public ConstraintsSerializer(List pks, List fks, + List uks, List nns, HiveConf hiveConf) { + this.hiveConf = hiveConf; + this.pks = pks; + this.fks = fks; + this.uks = uks; + this.nns = nns; + } + + @Override + public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider) + throws SemanticException, IOException { + String pksString, fksString, uksString, nnsString; + pksString = fksString = uksString = nnsString = ""; + if (pks != null) { + pksString = MessageFactory.getInstance().buildAddPrimaryKeyMessage(pks).toString(); + } + if (fks != null) { + fksString = MessageFactory.getInstance().buildAddForeignKeyMessage(fks).toString(); + } + if (uks != null) { + uksString = MessageFactory.getInstance().buildAddUniqueConstraintMessage(uks).toString(); + } + if (uks != null) { + nnsString = MessageFactory.getInstance().buildAddNotNullConstraintMessage(nns).toString(); + } + writer.jsonGenerator.writeStringField("pks", pksString); + writer.jsonGenerator.writeStringField("fks", fksString); + writer.jsonGenerator.writeStringField("uks", uksString); + writer.jsonGenerator.writeStringField("nns", nnsString); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddForeignKeyHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddForeignKeyHandler.java index 0873c1c1d3b2..23636725ae06 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddForeignKeyHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddForeignKeyHandler.java @@ -49,6 +49,11 @@ public List> handle(Context context) } } + List> tasks = new ArrayList>(); + if (fks.isEmpty()) { + return tasks; + } + String actualDbName = context.isDbNameEmpty() ? fks.get(0).getFktable_db() : context.dbName; String actualTblName = context.isTableNameEmpty() ? fks.get(0).getPktable_name() : context.tableName; @@ -61,7 +66,6 @@ public List> handle(Context context) AlterTableDesc addConstraintsDesc = new AlterTableDesc(actualDbName + "." + actualTblName, new ArrayList(), fks, new ArrayList(), context.eventOnlyReplicationSpec()); Task addConstraintsTask = TaskFactory.get(new DDLWork(readEntitySet, writeEntitySet, addConstraintsDesc), context.hiveConf); - List> tasks = new ArrayList>(); tasks.add(addConstraintsTask); context.log.debug("Added add constrains task : {}:{}", addConstraintsTask.getId(), actualTblName); updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, null); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddNotNullConstraintHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddNotNullConstraintHandler.java index 76cbe5ab9fb9..9c12e7e2af17 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddNotNullConstraintHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddNotNullConstraintHandler.java @@ -27,7 +27,6 @@ import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint; import org.apache.hadoop.hive.metastore.messaging.AddNotNullConstraintMessage; -import org.apache.hadoop.hive.metastore.messaging.AddUniqueConstraintMessage; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.parse.SemanticException; @@ -51,6 +50,11 @@ public List> handle(Context context) } } + List> tasks = new ArrayList>(); + if (nns.isEmpty()) { + return tasks; + } + String actualDbName = context.isDbNameEmpty() ? nns.get(0).getTable_db() : context.dbName; String actualTblName = context.isTableNameEmpty() ? nns.get(0).getTable_name() : context.tableName; @@ -62,7 +66,6 @@ public List> handle(Context context) AlterTableDesc addConstraintsDesc = new AlterTableDesc(actualDbName + "." + actualTblName, new ArrayList(), new ArrayList(), new ArrayList(), nns, context.eventOnlyReplicationSpec()); Task addConstraintsTask = TaskFactory.get(new DDLWork(readEntitySet, writeEntitySet, addConstraintsDesc), context.hiveConf); - List> tasks = new ArrayList>(); tasks.add(addConstraintsTask); context.log.debug("Added add constrains task : {}:{}", addConstraintsTask.getId(), actualTblName); updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, null); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddPrimaryKeyHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddPrimaryKeyHandler.java index aee46dad9b1b..d7ee2231549b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddPrimaryKeyHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddPrimaryKeyHandler.java @@ -48,6 +48,12 @@ public List> handle(Context context) throw (SemanticException)e; } } + + List> tasks = new ArrayList>(); + if (pks.isEmpty()) { + return tasks; + } + String actualDbName = context.isDbNameEmpty() ? pks.get(0).getTable_db() : context.dbName; String actualTblName = context.isTableNameEmpty() ? pks.get(0).getTable_name() : context.tableName; @@ -59,7 +65,6 @@ public List> handle(Context context) AlterTableDesc addConstraintsDesc = new AlterTableDesc(actualDbName + "." + actualTblName, pks, new ArrayList(), new ArrayList(), context.eventOnlyReplicationSpec()); Task addConstraintsTask = TaskFactory.get(new DDLWork(readEntitySet, writeEntitySet, addConstraintsDesc), context.hiveConf); - List> tasks = new ArrayList>(); tasks.add(addConstraintsTask); context.log.debug("Added add constrains task : {}:{}", addConstraintsTask.getId(), actualTblName); updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, null); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddUniqueConstraintHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddUniqueConstraintHandler.java index f0cb11ed75d6..0d9c700251b1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddUniqueConstraintHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddUniqueConstraintHandler.java @@ -49,6 +49,11 @@ public List> handle(Context context) } } + List> tasks = new ArrayList>(); + if (uks.isEmpty()) { + return tasks; + } + String actualDbName = context.isDbNameEmpty() ? uks.get(0).getTable_db() : context.dbName; String actualTblName = context.isTableNameEmpty() ? uks.get(0).getTable_name() : context.tableName; @@ -60,7 +65,6 @@ public List> handle(Context context) AlterTableDesc addConstraintsDesc = new AlterTableDesc(actualDbName + "." + actualTblName, new ArrayList(), new ArrayList(), uks, context.eventOnlyReplicationSpec()); Task addConstraintsTask = TaskFactory.get(new DDLWork(readEntitySet, writeEntitySet, addConstraintsDesc), context.hiveConf); - List> tasks = new ArrayList>(); tasks.add(addConstraintsTask); context.log.debug("Added add constrains task : {}:{}", addConstraintsTask.getId(), actualTblName); updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, null); From be0e46233e4283965f3fd60307379329929a5bae Mon Sep 17 00:00:00 2001 From: Daniel Dai Date: Wed, 6 Sep 2017 02:17:42 -0700 Subject: [PATCH 2/5] Address Sankar's review comments --- .../hive/ql/exec/repl/ReplDumpTask.java | 31 ++++++++++++------- .../ql/exec/repl/bootstrap/ReplLoadWork.java | 4 +-- .../filesystem/ConstraintEventsIterator.java | 27 ++++++++++------ .../apache/hadoop/hive/ql/metadata/Hive.java | 18 ++++++++--- .../ql/parse/ReplicationSemanticAnalyzer.java | 1 - .../repl/dump/io/ConstraintsSerializer.java | 2 +- 6 files changed, 54 insertions(+), 29 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 8f749a5e3754..a496afd78961 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -24,6 +24,7 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.api.SQLForeignKey; import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint; @@ -304,18 +305,26 @@ private void dumpFunctionMetadata(String dbName, Path dumpRoot) throws Exception } private void dumpConstraintMetadata(String dbName, String tblName, Path dumpRoot) throws Exception { - Path constraintsRoot = new Path(new Path(dumpRoot, dbName), CONSTRAINTS_ROOT_DIR_NAME); - Path constraintsFile = new Path(constraintsRoot, tblName); - List pks = getHive().getPrimaryKeyList(dbName, tblName); - List fks = getHive().getForeignKeyList(dbName, tblName); - List uks = getHive().getUniqueConstraintList(dbName, tblName); - List nns = getHive().getNotNullConstraintList(dbName, tblName); - if (!pks.isEmpty() || !fks.isEmpty() || !uks.isEmpty() || !nns.isEmpty()) { - try (JsonWriter jsonWriter = - new JsonWriter(constraintsFile.getFileSystem(conf), constraintsFile)) { - ConstraintsSerializer serializer = new ConstraintsSerializer(pks, fks, uks, nns, conf); - serializer.writeTo(jsonWriter, null); + try { + Path constraintsRoot = new Path(new Path(dumpRoot, dbName), CONSTRAINTS_ROOT_DIR_NAME); + Path constraintsFile = new Path(constraintsRoot, tblName); + Hive db = getHive(); + List pks = db.getPrimaryKeyList(dbName, tblName); + List fks = db.getForeignKeyList(dbName, tblName); + List uks = db.getUniqueConstraintList(dbName, tblName); + List nns = db.getNotNullConstraintList(dbName, tblName); + if ((pks != null && !pks.isEmpty()) || (fks != null && !fks.isEmpty()) || (uks != null && !uks.isEmpty()) + || (nns != null && !nns.isEmpty())) { + try (JsonWriter jsonWriter = + new JsonWriter(constraintsFile.getFileSystem(conf), constraintsFile)) { + ConstraintsSerializer serializer = new ConstraintsSerializer(pks, fks, uks, nns, conf); + serializer.writeTo(jsonWriter, null); + } } + } catch (NoSuchObjectException e) { + // Bootstrap constraint dump shouldn't fail if the table is dropped/renamed while dumping it. + // Just log a debug message and skip it. + LOG.debug(e.getMessage()); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java index f3e5c04b4656..a8e906750a8c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java @@ -40,7 +40,7 @@ public class ReplLoadWork implements Serializable { public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameToLoadIn, String tableNameToLoadIn) throws IOException { this.tableNameToLoadIn = tableNameToLoadIn; - this.iterator = new BootstrapEventsIterator(dumpDirectory, hiveConf); + this.iterator = new BootstrapEventsIterator(dumpDirectory, dbNameToLoadIn, hiveConf); this.constraintsIterator = new ConstraintEventsIterator(dumpDirectory, hiveConf); this.dbNameToLoadIn = dbNameToLoadIn; } @@ -74,4 +74,4 @@ DatabaseEvent databaseEvent(HiveConf hiveConf) { boolean hasDbState() { return state != null; } -} \ No newline at end of file +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java index bacf158d2da1..63f364b0f192 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java @@ -35,16 +35,11 @@ public class ConstraintEventsIterator implements Iterator { private FileStatus[] constraintFiles = null; private int currentConstraintIndex; private FileSystem fs; + private Path path; public ConstraintEventsIterator(String dumpDirectory, HiveConf hiveConf) throws IOException { - Path path = new Path(dumpDirectory); + path = new Path(dumpDirectory); fs = path.getFileSystem(hiveConf); - dbDirs = fs.listStatus(new Path(dumpDirectory), EximUtil.getDirectoryFilter(fs)); - currentDbIndex = 0; - if (dbDirs.length != 0) { - currentConstraintIndex = 0; - constraintFiles = listConstraintFilesInDBDir(fs, dbDirs[0].getPath()); - } } private FileStatus[] listConstraintFilesInDBDir(FileSystem fs, Path dbDir) { @@ -59,14 +54,26 @@ private FileStatus[] listConstraintFilesInDBDir(FileSystem fs, Path dbDir) { @Override public boolean hasNext() { - if (constraintFiles != null && currentConstraintIndex < constraintFiles.length) { + if (dbDirs == null) { + try { + dbDirs = fs.listStatus(path, EximUtil.getDirectoryFilter(fs)); + } catch (IOException e) { + throw new RuntimeException(e); + } + currentDbIndex = 0; + if (dbDirs.length != 0) { + currentConstraintIndex = 0; + constraintFiles = listConstraintFilesInDBDir(fs, dbDirs[0].getPath()); + } + } + if (currentDbIndex < dbDirs.length && currentConstraintIndex < constraintFiles.length) { return true; } - while (constraintFiles != null && currentConstraintIndex == constraintFiles.length) { + while (currentDbIndex < dbDirs.length && currentConstraintIndex == constraintFiles.length) { currentDbIndex ++; if (currentDbIndex < dbDirs.length) { currentConstraintIndex = 0; - constraintFiles = listConstraintFilesInDBDir(fs, dbDirs[0].getPath()); + constraintFiles = listConstraintFilesInDBDir(fs, dbDirs[currentDbIndex].getPath()); } else { constraintFiles = null; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index f4b95a82580f..aa44c62d8896 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -4203,38 +4203,48 @@ public void dropConstraint(String dbName, String tableName, String constraintNam throws HiveException, NoSuchObjectException { try { getMSC().dropConstraint(dbName, tableName, constraintName); + } catch (NoSuchObjectException e) { + throw e; } catch (Exception e) { throw new HiveException(e); } } - public List getPrimaryKeyList(String dbName, String tblName) throws HiveException { + public List getPrimaryKeyList(String dbName, String tblName) throws HiveException, NoSuchObjectException { try { return getMSC().getPrimaryKeys(new PrimaryKeysRequest(dbName, tblName)); + } catch (NoSuchObjectException e) { + throw e; } catch (Exception e) { throw new HiveException(e); } } - public List getForeignKeyList(String dbName, String tblName) throws HiveException { + public List getForeignKeyList(String dbName, String tblName) throws HiveException, NoSuchObjectException { try { return getMSC().getForeignKeys(new ForeignKeysRequest(null, null, dbName, tblName)); + } catch (NoSuchObjectException e) { + throw e; } catch (Exception e) { throw new HiveException(e); } } - public List getUniqueConstraintList(String dbName, String tblName) throws HiveException { + public List getUniqueConstraintList(String dbName, String tblName) throws HiveException, NoSuchObjectException { try { return getMSC().getUniqueConstraints(new UniqueConstraintsRequest(dbName, tblName)); + } catch (NoSuchObjectException e) { + throw e; } catch (Exception e) { throw new HiveException(e); } } - public List getNotNullConstraintList(String dbName, String tblName) throws HiveException { + public List getNotNullConstraintList(String dbName, String tblName) throws HiveException, NoSuchObjectException { try { return getMSC().getNotNullConstraints(new NotNullConstraintsRequest(dbName, tblName)); + } catch (NoSuchObjectException e) { + throw e; } catch (Exception e) { throw new HiveException(e); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 535fe403c00f..8f6316dacc2f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -79,7 +79,6 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { public static final String FUNCTIONS_ROOT_DIR_NAME = "_functions"; public static final String CONSTRAINTS_ROOT_DIR_NAME = "_constraints"; - private final static Logger REPL_STATE_LOG = LoggerFactory.getLogger("ReplState"); ReplicationSemanticAnalyzer(QueryState queryState) throws SemanticException { super(queryState); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/ConstraintsSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/ConstraintsSerializer.java index 22de079de5f1..2ae9f58d8fd3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/ConstraintsSerializer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/ConstraintsSerializer.java @@ -60,7 +60,7 @@ public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvi if (uks != null) { uksString = MessageFactory.getInstance().buildAddUniqueConstraintMessage(uks).toString(); } - if (uks != null) { + if (nns != null) { nnsString = MessageFactory.getInstance().buildAddNotNullConstraintMessage(nns).toString(); } writer.jsonGenerator.writeStringField("pks", pksString); From 68e934dbb5b0ee4c9112b05455b2fe0e05f2330c Mon Sep 17 00:00:00 2001 From: Daniel Dai Date: Wed, 6 Sep 2017 15:29:26 -0700 Subject: [PATCH 3/5] Addressing Sankar's review comments --- .../apache/hadoop/hive/ql/exec/DDLTask.java | 22 ++++++++---- .../hive/ql/exec/repl/ReplDumpTask.java | 36 ++++++++----------- .../ql/exec/repl/bootstrap/ReplLoadTask.java | 2 +- .../filesystem/ConstraintEventsIterator.java | 2 +- .../load/message/AddForeignKeyHandler.java | 10 ++++-- 5 files changed, 40 insertions(+), 32 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index acc23901d3f8..57b810ce7661 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -79,6 +79,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; import org.apache.hadoop.hive.metastore.api.Index; +import org.apache.hadoop.hive.metastore.api.InvalidObjectException; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; @@ -4073,14 +4074,23 @@ private int addConstraints(Hive db, AlterTableDesc alterTbl) throws SemanticException, HiveException { try { // This is either an alter table add foreign key or add primary key command. - if (alterTbl.getForeignKeyCols() != null - && !alterTbl.getForeignKeyCols().isEmpty()) { - db.addForeignKey(alterTbl.getForeignKeyCols()); - } - if (alterTbl.getPrimaryKeyCols() != null - && !alterTbl.getPrimaryKeyCols().isEmpty()) { + if (alterTbl.getPrimaryKeyCols() != null && !alterTbl.getPrimaryKeyCols().isEmpty()) { db.addPrimaryKey(alterTbl.getPrimaryKeyCols()); } + if (alterTbl.getForeignKeyCols() != null && !alterTbl.getForeignKeyCols().isEmpty()) { + try { + db.addForeignKey(alterTbl.getForeignKeyCols()); + } catch (HiveException e) { + if (e.getCause() instanceof InvalidObjectException + && alterTbl.getReplicationSpec().isInReplicationScope()) { + // During repl load, NoSuchObjectException in foreign key shall + // ignore as the foreign table may not be part of the replication + LOG.debug(e.getMessage()); + } else { + throw e; + } + } + } if (alterTbl.getUniqueConstraintCols() != null && !alterTbl.getUniqueConstraintCols().isEmpty()) { db.addUniqueConstraint(alterTbl.getUniqueConstraintCols()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index a496afd78961..9939a939c360 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -197,7 +197,7 @@ private Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws LOG.debug( "analyzeReplDump dumping table: " + tblName + " to db root " + dbRoot.toUri()); dumpTable(dbName, tblName, dbRoot); - dumpConstraintMetadata(dbName, tblName, dumpRoot); + dumpConstraintMetadata(dbName, tblName, dbRoot); } replLogger.endLog(bootDumpBeginReplId.toString()); } @@ -304,27 +304,21 @@ private void dumpFunctionMetadata(String dbName, Path dumpRoot) throws Exception } } - private void dumpConstraintMetadata(String dbName, String tblName, Path dumpRoot) throws Exception { - try { - Path constraintsRoot = new Path(new Path(dumpRoot, dbName), CONSTRAINTS_ROOT_DIR_NAME); - Path constraintsFile = new Path(constraintsRoot, tblName); - Hive db = getHive(); - List pks = db.getPrimaryKeyList(dbName, tblName); - List fks = db.getForeignKeyList(dbName, tblName); - List uks = db.getUniqueConstraintList(dbName, tblName); - List nns = db.getNotNullConstraintList(dbName, tblName); - if ((pks != null && !pks.isEmpty()) || (fks != null && !fks.isEmpty()) || (uks != null && !uks.isEmpty()) - || (nns != null && !nns.isEmpty())) { - try (JsonWriter jsonWriter = - new JsonWriter(constraintsFile.getFileSystem(conf), constraintsFile)) { - ConstraintsSerializer serializer = new ConstraintsSerializer(pks, fks, uks, nns, conf); - serializer.writeTo(jsonWriter, null); - } + private void dumpConstraintMetadata(String dbName, String tblName, Path dbRoot) throws Exception { + Path constraintsRoot = new Path(dbRoot, CONSTRAINTS_ROOT_DIR_NAME); + Path constraintsFile = new Path(constraintsRoot, tblName); + Hive db = getHive(); + List pks = db.getPrimaryKeyList(dbName, tblName); + List fks = db.getForeignKeyList(dbName, tblName); + List uks = db.getUniqueConstraintList(dbName, tblName); + List nns = db.getNotNullConstraintList(dbName, tblName); + if ((pks != null && !pks.isEmpty()) || (fks != null && !fks.isEmpty()) || (uks != null && !uks.isEmpty()) + || (nns != null && !nns.isEmpty())) { + try (JsonWriter jsonWriter = + new JsonWriter(constraintsFile.getFileSystem(conf), constraintsFile)) { + ConstraintsSerializer serializer = new ConstraintsSerializer(pks, fks, uks, nns, conf); + serializer.writeTo(jsonWriter, null); } - } catch (NoSuchObjectException e) { - // Bootstrap constraint dump shouldn't fail if the table is dropped/renamed while dumping it. - // Just log a debug message and skip it. - LOG.debug(e.getMessage()); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java index 45b929e36b27..bcd8784ffb46 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java @@ -196,7 +196,7 @@ a database ( directory ) } } - if (!iterator.currentDbHasNext()) { + if (!loadingConstraint && !iterator.currentDbHasNext()) { createEndReplLogTask(context, scope, iterator.replLogger()); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java index 63f364b0f192..4d3eac4eefcb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java @@ -44,7 +44,7 @@ public ConstraintEventsIterator(String dumpDirectory, HiveConf hiveConf) throws private FileStatus[] listConstraintFilesInDBDir(FileSystem fs, Path dbDir) { try { - return fs.listStatus(new Path(dbDirs[0].getPath(), ReplicationSemanticAnalyzer.CONSTRAINTS_ROOT_DIR_NAME)); + return fs.listStatus(new Path(dbDir, ReplicationSemanticAnalyzer.CONSTRAINTS_ROOT_DIR_NAME)); } catch (FileNotFoundException e) { return new FileStatus[]{}; } catch (IOException e) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddForeignKeyHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddForeignKeyHandler.java index 23636725ae06..0fd970ae24af 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddForeignKeyHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddForeignKeyHandler.java @@ -55,12 +55,16 @@ public List> handle(Context context) } String actualDbName = context.isDbNameEmpty() ? fks.get(0).getFktable_db() : context.dbName; - String actualTblName = context.isTableNameEmpty() ? fks.get(0).getPktable_name() : context.tableName; + String actualTblName = context.isTableNameEmpty() ? fks.get(0).getFktable_name() : context.tableName; for (SQLForeignKey fk : fks) { - fk.setPktable_db(actualDbName); - fk.setPktable_name(actualTblName); + // If parent table is in the same database, change it to the actual db on destination + // Otherwise, keep db name + if (fk.getPktable_db().equals(fk.getFktable_db())) { + fk.setPktable_db(actualDbName); + } fk.setFktable_db(actualDbName); + fk.setFktable_name(actualTblName); } AlterTableDesc addConstraintsDesc = new AlterTableDesc(actualDbName + "." + actualTblName, new ArrayList(), fks, From a0c1a11705e1415edf49ffc63b68f3893367759a Mon Sep 17 00:00:00 2001 From: Daniel Dai Date: Thu, 7 Sep 2017 10:42:30 -0700 Subject: [PATCH 4/5] Addressing Sankar's review comments --- .../hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java | 2 +- .../events/filesystem/ConstraintEventsIterator.java | 6 +----- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java index bcd8784ffb46..706d0b68be67 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java @@ -93,7 +93,7 @@ a database ( directory ) if (!iterator.hasNext() && constraintIterator.hasNext()) { loadingConstraint = true; } - while ((iterator.hasNext() || loadingConstraint && constraintIterator.hasNext()) && loadTaskTracker.canAddMoreTasks()) { + while ((iterator.hasNext() || (loadingConstraint && constraintIterator.hasNext())) && loadTaskTracker.canAddMoreTasks()) { BootstrapEvent next; if (!loadingConstraint) { next = iterator.next(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java index 4d3eac4eefcb..5a483318a37e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java @@ -78,11 +78,7 @@ public boolean hasNext() { constraintFiles = null; } } - if (constraintFiles != null) { - return true; - } else { - return false; - } + return constraintFiles != null; } @Override From 5c3f580255dc3b9792756c6723c349cb715c38f8 Mon Sep 17 00:00:00 2001 From: Daniel Dai Date: Thu, 7 Sep 2017 22:29:27 -0700 Subject: [PATCH 5/5] Addressing Sankar's review comments --- .../bootstrap/events/filesystem/ConstraintEventsIterator.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java index 5a483318a37e..12d4c0d282fc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java @@ -66,10 +66,10 @@ public boolean hasNext() { constraintFiles = listConstraintFilesInDBDir(fs, dbDirs[0].getPath()); } } - if (currentDbIndex < dbDirs.length && currentConstraintIndex < constraintFiles.length) { + if ((currentDbIndex < dbDirs.length) && (currentConstraintIndex < constraintFiles.length)) { return true; } - while (currentDbIndex < dbDirs.length && currentConstraintIndex == constraintFiles.length) { + while ((currentDbIndex < dbDirs.length) && (currentConstraintIndex == constraintFiles.length)) { currentDbIndex ++; if (currentDbIndex < dbDirs.length) { currentConstraintIndex = 0;