From 96db845cf17552ebf6bed664114fa4f7cd0a708e Mon Sep 17 00:00:00 2001 From: bvellanki Date: Fri, 1 Apr 2016 15:10:29 -0700 Subject: [PATCH 1/3] FALCON-1880 To support TDE encryption : Add --skipcrccheck to distcp options for HiveDR --- addons/hivedr/pom.xml | 6 +++ .../org/apache/falcon/hive/HiveDRArgs.java | 3 ++ .../org/apache/falcon/hive/HiveDROptions.java | 7 ++- .../org/apache/falcon/hive/HiveDRTool.java | 4 +- .../apache/falcon/hive/util/EventUtils.java | 51 +++++++++---------- ...hive-disaster-recovery-secure-workflow.xml | 6 +++ .../hive-disaster-recovery-secure.properties | 2 + .../recipe/HiveReplicationRecipeTool.java | 4 ++ .../HiveReplicationRecipeToolOptions.java | 1 + 9 files changed, 55 insertions(+), 29 deletions(-) diff --git a/addons/hivedr/pom.xml b/addons/hivedr/pom.xml index 37dc5c94c..adf0459f3 100644 --- a/addons/hivedr/pom.xml +++ b/addons/hivedr/pom.xml @@ -172,6 +172,12 @@ hadoop-distcp compile + + + org.apache.derby + derby + ${derby.version} + diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java index 549023201..c9ad47e61 100644 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java @@ -64,6 +64,9 @@ public enum HiveDRArgs { REPLICATION_MAX_MAPS("replicationMaxMaps", "number of maps", false), DISTCP_MAX_MAPS("distcpMaxMaps", "number of maps", false), + // Set to true if TDE is enabled + TDE_ENCRYPTION_ENABLED("tdeEncryptionEnabled", "Set to true if TDE encryption is enabled", false), + // Map Bandwidth DISTCP_MAP_BANDWIDTH("distcpMapBandwidth", "map bandwidth in mb", false), diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java index 28515e477..868ec8d87 100644 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java @@ -120,7 +120,7 @@ public String getJobName() { } public int getMaxEvents() { - return Integer.valueOf(context.get(HiveDRArgs.MAX_EVENTS)); + return Integer.parseInt(context.get(HiveDRArgs.MAX_EVENTS)); } public boolean shouldKeepHistory() { @@ -147,6 +147,11 @@ public String getExecutionStage() { return context.get(HiveDRArgs.EXECUTION_STAGE); } + public boolean isTDEEncryptionEnabled() { + return StringUtils.isEmpty(context.get(HiveDRArgs.TDE_ENCRYPTION_ENABLED)) + ? false : Boolean.valueOf(context.get(HiveDRArgs.TDE_ENCRYPTION_ENABLED)); + } + public boolean shouldBlock() { return true; } diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java index e14180009..22826b85e 100644 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java @@ -271,8 +271,8 @@ private void cleanStagingDirectory() throws HiveReplicationException { private String sourceEvents() throws Exception { MetaStoreEventSourcer defaultSourcer = null; String inputFilename = null; - String lastEventsIdFile = FileUtils.DEFAULT_EVENT_STORE_PATH +File.separator+inputOptions.getJobName()+"/" - +inputOptions.getJobName()+".id"; + String lastEventsIdFile = FileUtils.DEFAULT_EVENT_STORE_PATH + File.separator + + inputOptions.getJobName() + "/" + inputOptions.getJobName() + ".id"; Map lastEventsIdMap = getLastDBTableEvents(new Path(lastEventsIdFile)); try { HCatClient sourceMetastoreClient = HiveMetastoreUtils.initializeHiveMetaStoreClient( diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java index d075bfb7d..3b088f7aa 100644 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java @@ -67,10 +67,12 @@ public class EventUtils { private String jobNN = null; private String jobNNKerberosPrincipal = null; private String targetHiveServer2Uri = null; + private String sourceStagingPath = null; private String targetStagingPath = null; private String targetNN = null; private String targetNNKerberosPrincipal = null; - private String fullyQualifiedTargetStagingPath = null; + private String sourceStagingUri = null; + private String targetStagingUri = null; private List sourceCleanUpList = null; private List targetCleanUpList = null; private static final Logger LOG = LoggerFactory.getLogger(EventUtils.class); @@ -93,6 +95,8 @@ public EventUtils(Configuration conf) { sourceDatabase = conf.get(HiveDRArgs.SOURCE_DATABASE.getName()); sourceNN = conf.get(HiveDRArgs.SOURCE_NN.getName()); sourceNNKerberosPrincipal = conf.get(HiveDRArgs.SOURCE_NN_KERBEROS_PRINCIPAL.getName()); + sourceStagingPath = conf.get(HiveDRArgs.SOURCE_STAGING_PATH.getName()) + + File.separator + conf.get(HiveDRArgs.JOB_NAME.getName()); jobNN = conf.get(HiveDRArgs.JOB_CLUSTER_NN.getName()); jobNNKerberosPrincipal = conf.get(HiveDRArgs.JOB_CLUSTER_NN_KERBEROS_PRINCIPAL.getName()); targetHiveServer2Uri = conf.get(HiveDRArgs.TARGET_HS2_URI.getName()); @@ -139,7 +143,8 @@ public void setupConnection() throws Exception { public void initializeFS() throws IOException { LOG.info("Initializing staging directory"); - fullyQualifiedTargetStagingPath = new Path(targetNN, targetStagingPath).toString(); + sourceStagingUri = new Path(sourceNN, sourceStagingPath).toString(); + targetStagingUri = new Path(targetNN, targetStagingPath).toString(); sourceFileSystem = FileSystem.get(FileUtils.getConfiguration(sourceNN, sourceNNKerberosPrincipal)); jobFileSystem = FileSystem.get(FileUtils.getConfiguration(jobNN, jobNNKerberosPrincipal)); targetFileSystem = FileSystem.get(FileUtils.getConfiguration(targetNN, targetNNKerberosPrincipal)); @@ -177,7 +182,7 @@ public void processEvents(String event) throws Exception { LOG.info("Process the export statements for db {} table {}", dbName, tableName); processCommands(exportEventStr, dbName, tableName, sourceStatement, sourceCleanUpList, false); if (!sourceCleanUpList.isEmpty()) { - invokeCopy(sourceCleanUpList); + invokeCopy(); } } } else if (conf.get(HiveDRArgs.EXECUTION_STAGE.getName()) @@ -310,11 +315,11 @@ private void addReplicationStatus(ReplicationStatus.Status status, String dbName } } - public void invokeCopy(List srcStagingPaths) throws Exception { - DistCpOptions options = getDistCpOptions(srcStagingPaths); + public void invokeCopy() throws Exception { + DistCpOptions options = getDistCpOptions(); DistCp distCp = new DistCp(conf, options); - LOG.info("Started DistCp with source Path: {} \ttarget path: {}", StringUtils.join(srcStagingPaths.toArray()), - fullyQualifiedTargetStagingPath); + LOG.info("Started DistCp with source Path: {} \ttarget path: {}", sourceStagingUri, targetStagingUri); + Job distcpJob = distCp.execute(); LOG.info("Distp Hadoop job: {}", distcpJob.getJobID().toString()); LOG.info("Completed DistCp"); @@ -323,27 +328,21 @@ public void invokeCopy(List srcStagingPaths) throws Exception { } } - public DistCpOptions getDistCpOptions(List srcStagingPaths) { - /* - * Add the fully qualified sourceNameNode to srcStagingPath uris. This will - * ensure DistCp will succeed when the job is run on target cluster. - */ - List fullyQualifiedSrcStagingPaths = new ArrayList(); - for (Path srcPath : srcStagingPaths) { - fullyQualifiedSrcStagingPaths.add(new Path(sourceNN, srcPath.toString())); + public DistCpOptions getDistCpOptions() { + // DistCpOptions expects the first argument to be a file OR a list of Paths + List sourceUris=new ArrayList(); + sourceUris.add(new Path(sourceStagingUri)); + DistCpOptions distcpOptions = new DistCpOptions(sourceUris, new Path(targetStagingUri)); + + // setSyncFolder(true) ensures directory structure is maintained when source is copied to target + distcpOptions.setSyncFolder(true); + // skipCRCCheck if TDE is enabled. + if (Boolean.parseBoolean(conf.get(HiveDRArgs.TDE_ENCRYPTION_ENABLED.getName()))) { + distcpOptions.setSkipCRC(true); } - fullyQualifiedSrcStagingPaths.toArray(new Path[fullyQualifiedSrcStagingPaths.size()]); - - DistCpOptions distcpOptions = new DistCpOptions(fullyQualifiedSrcStagingPaths, - new Path(fullyQualifiedTargetStagingPath)); - /* setSyncFolder to false to retain dir structure as in source at the target. If set to true all files will be - copied to the same staging sir at target resulting in DuplicateFileException in DistCp. - */ - - distcpOptions.setSyncFolder(false); distcpOptions.setBlocking(true); - distcpOptions.setMaxMaps(Integer.valueOf(conf.get(HiveDRArgs.DISTCP_MAX_MAPS.getName()))); - distcpOptions.setMapBandwidth(Integer.valueOf(conf.get(HiveDRArgs.DISTCP_MAP_BANDWIDTH.getName()))); + distcpOptions.setMaxMaps(Integer.parseInt(conf.get(HiveDRArgs.DISTCP_MAX_MAPS.getName()))); + distcpOptions.setMapBandwidth(Integer.parseInt(conf.get(HiveDRArgs.DISTCP_MAP_BANDWIDTH.getName()))); return distcpOptions; } diff --git a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-workflow.xml b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-workflow.xml index 0494cf6a1..2d6b8bee2 100644 --- a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-workflow.xml +++ b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-workflow.xml @@ -142,6 +142,8 @@ ${clusterForJobRunWriteEP} -clusterForJobNNKerberosPrincipal ${clusterForJobNNKerberosPrincipal} + -tdeEncryptionEnabled + ${tdeEncryptionEnabled} -drJobName ${drJobName}-${nominalTime} -executionStage @@ -240,6 +242,8 @@ ${clusterForJobRunWriteEP} -clusterForJobNNKerberosPrincipal ${clusterForJobNNKerberosPrincipal} + -tdeEncryptionEnabled + ${tdeEncryptionEnabled} -drJobName ${drJobName}-${nominalTime} -executionStage @@ -340,6 +344,8 @@ ${clusterForJobRunWriteEP} -clusterForJobNNKerberosPrincipal ${clusterForJobNNKerberosPrincipal} + -tdeEncryptionEnabled + ${tdeEncryptionEnabled} -drJobName ${drJobName}-${nominalTime} -executionStage diff --git a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure.properties b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure.properties index 8d00bb582..331d57e90 100644 --- a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure.properties +++ b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure.properties @@ -102,6 +102,8 @@ replicationMaxMaps=5 distcpMaxMaps=1 # Change it to specify the bandwidth in MB for each mapper in DistCP distcpMapBandwidth=100 +# Set this flag to true if TDE encryption is enabled on source and target +tdeEncryptionEnabled=false ##### Email Notification for Falcon instance completion falcon.recipe.notification.type=email diff --git a/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeTool.java b/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeTool.java index 8b396739e..3df89d3c4 100644 --- a/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeTool.java +++ b/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeTool.java @@ -129,6 +129,10 @@ public Properties getAdditionalSystemProperties(final Properties recipePropertie additionalProperties.put(HiveReplicationRecipeToolOptions.CLUSTER_FOR_JOB_NN_KERBEROS_PRINCIPAL.getName(), recipeProperties.getProperty(RecipeToolOptions.RECIPE_NN_PRINCIPAL.getName())); } + if (StringUtils.isEmpty( + recipeProperties.getProperty(HiveReplicationRecipeToolOptions.TDE_ENCRYPTION_ENABLED.getName()))) { + additionalProperties.put(HiveReplicationRecipeToolOptions.TDE_ENCRYPTION_ENABLED.getName(), "false"); + } return additionalProperties; } diff --git a/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeToolOptions.java b/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeToolOptions.java index ec0465d02..3d69d6e51 100644 --- a/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeToolOptions.java +++ b/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeToolOptions.java @@ -54,6 +54,7 @@ public enum HiveReplicationRecipeToolOptions { CLUSTER_FOR_JOB_NN_KERBEROS_PRINCIPAL("clusterForJobNNKerberosPrincipal", "Write EP of cluster on which replication job runs", false), CLUSTER_FOR_JOB_RUN_WRITE_EP("clusterForJobRunWriteEP", "Write EP of cluster on which replication job runs", false), + TDE_ENCRYPTION_ENABLED("tdeEncryptionEnabled", "Set to true if TDE encryption is enabled", false), HIVE_DR_JOB_NAME("drJobName", "Unique hive DR job name", false); private final String name; From ac3893a167e3e6a50bf4888f61d38ac156b02a65 Mon Sep 17 00:00:00 2001 From: bvellanki Date: Fri, 1 Apr 2016 16:56:15 -0700 Subject: [PATCH 2/3] FALCON-1880 To support TDE encryption : Add --skipcrccheck to distcp options for HiveDR --- .../hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java index 22826b85e..17eec22ec 100644 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java @@ -272,7 +272,7 @@ private String sourceEvents() throws Exception { MetaStoreEventSourcer defaultSourcer = null; String inputFilename = null; String lastEventsIdFile = FileUtils.DEFAULT_EVENT_STORE_PATH + File.separator - + inputOptions.getJobName() + "/" + inputOptions.getJobName() + ".id"; + + inputOptions.getJobName() + File.separator + inputOptions.getJobName() + ".id"; Map lastEventsIdMap = getLastDBTableEvents(new Path(lastEventsIdFile)); try { HCatClient sourceMetastoreClient = HiveMetastoreUtils.initializeHiveMetaStoreClient( From 374c5573cd3a209be3e4a73745f53f420ee9a9ad Mon Sep 17 00:00:00 2001 From: bvellanki Date: Wed, 6 Apr 2016 14:54:03 -0700 Subject: [PATCH 3/3] FALCON-1880 To support TDE encryption : Add --skipcrccheck to distcp options for HiveDR, add documentation. --- docs/src/site/twiki/HiveDR.twiki | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/docs/src/site/twiki/HiveDR.twiki b/docs/src/site/twiki/HiveDR.twiki index a8f6aeef0..cf35694ad 100644 --- a/docs/src/site/twiki/HiveDR.twiki +++ b/docs/src/site/twiki/HiveDR.twiki @@ -53,6 +53,11 @@ Following is the prerequisites to use Hive DR in Falcon conf client.properties. Now update the copied recipe properties file with required attributes to replicate metadata and data from source cluster to destination cluster for Hive DR. + * *Note : HiveDR on TDE encrypted clusters* + When submitting HiveDR recipe in a kerberos secured setup, it is possible that the source and target staging directories + are encrypted using Transparent Data Encryption (TDE). If your cluster dirs are TDE encrypted, please set + "tdeEncryptionEnabled=true" in the recipe properties file. Default value for this property is "false". + ---+++ Submit Hive DR recipe After updating the recipe properties file with required attributes in directory path or in falcon.recipe.path, there are two ways of submitting the Hive DR recipe: @@ -72,3 +77,4 @@ Following is the prerequisites to use Hive DR *Note:* * Recipe properties file, workflow file and template file name must match to the recipe name, it must be unique and in the same directory. * If kerberos security is enabled on cluster, use the secure templates for Hive DR from $FALCON_HOME/data-mirroring/hive-disaster-recovery . +