From 542237f0ba20f4698860461fe42df282cd2be27d Mon Sep 17 00:00:00 2001 From: Kumar Siddhartha Date: Fri, 8 Nov 2019 17:50:15 -0500 Subject: [PATCH] Develop (#96) * metastor changes * 1.5.0 (#95) * updated to newer version * specify only two subnets for elasticsearch domain as AWS doesn't choose first two by default now. (#88) * 1.0.5 Release - Removed duplicate parameters in CFT templates - Fixed bug with ns-auth sync config applying to only Herd installations - Fixed incremental DB script installation logic - Upgraded Herd and Shepherd (Herd-UI) versions - Upgraded Herd DB engine version - Fixed issue where certain DB instance types were not available - Update herd-ui to the latest available version: 0.78.0 * Updated OSS version --- mdl/src/main/cft/installMDL.yml | 6 +- mdl/src/main/cft/mdl.yml | 6 +- mdl/src/main/cft/mdlBdsql.yml | 1 - mdl/src/main/cft/mdlCreateNsAuthSyncUtil.yml | 69 +--- mdl/src/main/cft/mdlCreateS3Buckets.yml | 16 - mdl/src/main/cft/mdlHerd.yml | 10 +- mdl/src/main/cft/mdlHerdRds.yml | 3 +- mdl/src/main/cft/mdlMetastorRds.yml | 1 - mdl/src/main/cft/mdlShepherd.yml | 5 - mdl/src/main/herd/scripts/installDBForHerd.sh | 35 +- metastor/managedObjectLoader/pom.xml | 2 +- .../metastore/managed/ClusterManager.java | 69 ++-- .../herd/metastore/managed/JobDefinition.java | 271 +++++++------ .../herd/metastore/managed/JobPicker.java | 61 ++- .../metastore/managed/NotificationSender.java | 12 +- .../metastore/managed/ObjectDefinition.java | 1 + .../metastore/managed/ObjectProcessor.java | 2 +- .../managed/conf/HerdMetastoreConfig.java | 200 +++++----- .../managed/datamgmt/DataMgmtSvc.java | 375 ++++++++++-------- .../metastore/managed/hive/HivePartition.java | 29 +- .../jobProcessor/BackLoadObjectProcessor.java | 10 +- .../jobProcessor/HiveHqlGenerator.java | 80 +++- .../ManagedObjStatsProcessor.java | 39 +- .../managed/util/JobProcessorConstants.java | 6 +- .../metastore/managed/util/MetastoreUtil.java | 18 + metastor/metastoreOperations/pom.xml | 2 +- metastor/pom.xml | 2 +- 27 files changed, 737 insertions(+), 594 deletions(-) diff --git a/mdl/src/main/cft/installMDL.yml b/mdl/src/main/cft/installMDL.yml index c9fef7c0..9c119ea6 100644 --- a/mdl/src/main/cft/installMDL.yml +++ b/mdl/src/main/cft/installMDL.yml @@ -20,7 +20,7 @@ Parameters: Description: > MDL release version to use. [REQUIRED] Type: String - Default: '1.4.0' + Default: '1.5.0' DeployComponents: Type: String Default: All @@ -44,11 +44,11 @@ Parameters: - 'false' Type: String HerdDBClass: - Default: db.t2.medium + Default: db.m4.large Description: Database instance class for Herd Type: String MetastorDBClass: - Default: db.m3.medium + Default: db.m4.large Description: Database instance class for Metastor Type: String HerdDBSize: diff --git a/mdl/src/main/cft/mdl.yml b/mdl/src/main/cft/mdl.yml index 40a43bc1..f3ba6469 100644 --- a/mdl/src/main/cft/mdl.yml +++ b/mdl/src/main/cft/mdl.yml @@ -36,11 +36,11 @@ Parameters: AllowedValues: [true, false] Type: String HerdVersion: - Default: '0.81.0' + Default: '0.107.0' Description: Herd release version to use. Type: String HerdUIVersion: - Default: '0.41.0' + Default: '0.78.0' Description: Herd-UI release version to use. Type: String HerdDBClass: @@ -299,7 +299,6 @@ Resources: HerdSecurityGroup: !Join ['', [/app/MDL/, !Ref MDLInstanceName, /, !Ref Environment, /SecurityGroup/Herd]] MDLServerDeploymentRole: !Join ['', [/app/MDL/, !Ref MDLInstanceName, /, !Ref Environment, /IAM/MDLServerDeploymentRole]] CertificateArn: !Ref CertificateArn - ShepherdWebSiteBucketUrl: !Join ['', [/app/MDL/, !Ref MDLInstanceName, /, !Ref Environment, /S3/URL/Shepherd]] ShepherdS3BucketName: !Join ['', [/app/MDL/, !Ref MDLInstanceName, /, !Ref Environment, /S3/Shepherd]] HerdBucketName: !Join ['', [/app/MDL/, !Ref MDLInstanceName, /, !Ref Environment, /S3/Herd]] DeploymentBucketName: !Ref DeploymentBucketName @@ -321,6 +320,7 @@ Resources: HostedZoneName: !Ref HostedZoneName MDLInstanceName: !Ref MDLInstanceName Environment: !Ref Environment + DeployComponents: !Ref DeployComponents VpcIdParameterKey: !Join ['/', ['/global', !Ref MDLInstanceName, !Ref Environment, 'VPC/ID']] PrivateSubnetsParameterKey: !Join ['/', ['/global', !Ref MDLInstanceName, !Ref Environment, 'VPC/SubnetIDs/private']] PublicSubnetsParameterKey: !Join ['/', ['/global', !Ref MDLInstanceName, !Ref Environment, 'VPC/SubnetIDs/public']] diff --git a/mdl/src/main/cft/mdlBdsql.yml b/mdl/src/main/cft/mdlBdsql.yml index ba57921c..21de283f 100644 --- a/mdl/src/main/cft/mdlBdsql.yml +++ b/mdl/src/main/cft/mdlBdsql.yml @@ -233,7 +233,6 @@ Resources: hive.stats.autogather: 'false' javax.jdo.option.ConnectionUserName: 'MS_Hive_0_13' javax.jdo.option.ConnectionPassword: '{{HIVE_PASSWORD}}' - hive.metastore.schema.verification: false javax.jdo.option.ConnectionURL: !Sub 'jdbc:mysql://${MetastorDBHostName}:3306/metastor?trustServerCertificate=true&createDatabaseIfNotExist=false&useSSL=true&requireSSL=true' hive.metastore.warehouse.dir: !Join - '' diff --git a/mdl/src/main/cft/mdlCreateNsAuthSyncUtil.yml b/mdl/src/main/cft/mdlCreateNsAuthSyncUtil.yml index f67cae83..89ec0177 100644 --- a/mdl/src/main/cft/mdlCreateNsAuthSyncUtil.yml +++ b/mdl/src/main/cft/mdlCreateNsAuthSyncUtil.yml @@ -97,6 +97,16 @@ Resources: ManagedPolicyArns: - 'arn:aws:iam::aws:policy/AmazonSSMReadOnlyAccess' - 'arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole' + - 'arn:aws:iam::aws:policy/service-role/AWSLambdaENIManagementAccess' + Policies: + - PolicyName: root + PolicyDocument: + Version: 2012-10-17 + Statement: + - Action: + - 'ec2:DetachNetworkInterface' + Effect: Allow + Resource: '*' NsAuthUtilityArnParameter: Type: AWS::SSM::Parameter Properties: @@ -127,65 +137,6 @@ Resources: Principal: sns.amazonaws.com SourceArn: !Ref NsAuthChangeNotificationSnsTopic FunctionName: !GetAtt 'NsAuthSyncLambdaFunction.Arn' - ManageEniLifecycleFunction: - Type: AWS::Lambda::Function - DependsOn: NsAuthSyncLambdaFunction - Properties: - Handler: index.handler - Role: !GetAtt LambdaExecutionRole.Arn - Code: - ZipFile: !Sub | - var response = require('cfn-response'); - var AWS = require('aws-sdk'); - - exports.handler = function(event, context) { - if (event.RequestType != 'Delete') { - response.send(event, context, response.SUCCESS, {}); - return; - } - var ec2 = new AWS.EC2(); - var params = { - Filters: [ - { - Name: 'group-id', - Values: event.ResourceProperties.SecurityGroups - }, - { - Name: 'description', - Values: ['AWS Lambda VPC ENI: *'] - } - ] - }; - - console.log("Deleting attachments!"); - // Detach, then delete ENIs which were spinned up by AWS for our VPC Lambda - ec2.describeNetworkInterfaces(params).promise().then(function(data) { - console.log("Got Interfaces:\n", JSON.stringify(data)); - return Promise.all(data.NetworkInterfaces.map(function(networkInterface) { - var networkInterfaceId = networkInterface.NetworkInterfaceId; - var attachmentId = networkInterface.Attachment.AttachmentId; - return ec2.detachNetworkInterface({AttachmentId: attachmentId}).promise().then(function(data) { - return ec2.waitFor('networkInterfaceAvailable', {NetworkInterfaceIds: [networkInterfaceId]}).promise(); - }).then(function(data) { - console.log("Detached Interface, deleting:\n", networkInterfaceId); - return ec2.deleteNetworkInterface({NetworkInterfaceId: networkInterfaceId}).promise(); - }); - })); - }).then(function(data) { - console.log("Success!"); - response.send(event, context, response.SUCCESS, {}); - }).catch(function(err) { - console.log("Failure:\n", JSON.stringify(err)); - response.send(event, context, response.FAILED, {}); - }); - }; - Timeout: 300 - Runtime: nodejs4.3 - VPCDestroyENI: - Type: Custom::VPCDestroyENI - Properties: - ServiceToken: !GetAtt 'ManageEniLifecycleFunction.Arn' - SecurityGroups: [!Ref NsAuthSyncUtilitySecurityGroupParameter] Outputs: LambdaFunctionName: Description: Function name of the ns-auth sync utility lambda. diff --git a/mdl/src/main/cft/mdlCreateS3Buckets.yml b/mdl/src/main/cft/mdlCreateS3Buckets.yml index e2cc4930..d24866b3 100644 --- a/mdl/src/main/cft/mdlCreateS3Buckets.yml +++ b/mdl/src/main/cft/mdlCreateS3Buckets.yml @@ -208,22 +208,6 @@ Resources: Type: String Value: !If [EnableSSLAndAuth, !Ref CloudfrontS3Bucket, !Ref S3BucketForStaticPages] Description: Name of Shepherd S3 bucket - ShepherdS3BucketDomainParameter: - Type: 'AWS::SSM::Parameter' - Condition: EnableSSLAndAuth - Properties: - Name: !Join - - '' - - - /app/MDL/ - - !Ref MDLInstanceName - - / - - !Ref Environment - - /S3/Domain/Shepherd - Type: String - Value: !GetAtt - - CloudfrontS3Bucket - - DomainName - Description: Domain name of Shepherd S3 bucket ShepherdS3BucketURLParameter: Type: 'AWS::SSM::Parameter' Condition: EnableSSLAndAuth diff --git a/mdl/src/main/cft/mdlHerd.yml b/mdl/src/main/cft/mdlHerd.yml index c6831388..2e41a5ec 100644 --- a/mdl/src/main/cft/mdlHerd.yml +++ b/mdl/src/main/cft/mdlHerd.yml @@ -34,9 +34,9 @@ Parameters: CreateSQS: Description: Create SQS - true || false Type: AWS::SSM::Parameter::Value - ShepherdWebSiteBucketUrl: - Description: Shepherd website URL - Type: AWS::SSM::Parameter::Value + DeployComponents: + Description: Components requested for deployment + Type: String ShepherdS3BucketName: Description: 'The bucket name of Shepherd ' Type: AWS::SSM::Parameter::Value @@ -484,6 +484,10 @@ Resources: environment= - !Ref Environment + - |- + + deployComponents= + - !Ref DeployComponents - |- herdVersion= diff --git a/mdl/src/main/cft/mdlHerdRds.yml b/mdl/src/main/cft/mdlHerdRds.yml index c7ae5f93..0dfc334e 100644 --- a/mdl/src/main/cft/mdlHerdRds.yml +++ b/mdl/src/main/cft/mdlHerdRds.yml @@ -26,7 +26,6 @@ Parameters: AllowedPattern: '[a-zA-Z_][a-zA-Z0-9_]*' ConstraintDescription: must begin with a letter and contain alphanumeric characters and _. HerdDBClass: - Default: db.m4.large Description: Database instance class Type: String HerdDBEngine: @@ -34,7 +33,7 @@ Parameters: Description: Postgres RDS database Engine Type: String HerdDBEngineVersion: - Default: 9.5.4 + Default: 9.5.15 Description: Postgres database version Type: String HerdDBSize: diff --git a/mdl/src/main/cft/mdlMetastorRds.yml b/mdl/src/main/cft/mdlMetastorRds.yml index 85214bae..b4a342e9 100644 --- a/mdl/src/main/cft/mdlMetastorRds.yml +++ b/mdl/src/main/cft/mdlMetastorRds.yml @@ -29,7 +29,6 @@ Parameters: Description: VPC Parameter key name in system store Type: String MetastorDBClass: - Default: db.m4.large Description: Database instance class Type: String MetastorDBEngine: diff --git a/mdl/src/main/cft/mdlShepherd.yml b/mdl/src/main/cft/mdlShepherd.yml index dacce93a..50129411 100644 --- a/mdl/src/main/cft/mdlShepherd.yml +++ b/mdl/src/main/cft/mdlShepherd.yml @@ -28,11 +28,6 @@ Parameters: CertificateArn: Description: Certificate Arn for MDL Type: String - EnableSSLAndAuth: - Default: 'false' - Description: Whether to enable SSL and Auth - ConstraintDescription: Must specify true or false - Type: String EnableSSLAndAuth: Default: 'true' Description: Whether to enable Authentication/SSL diff --git a/mdl/src/main/herd/scripts/installDBForHerd.sh b/mdl/src/main/herd/scripts/installDBForHerd.sh index 349ddb0e..d5ac7fac 100644 --- a/mdl/src/main/herd/scripts/installDBForHerd.sh +++ b/mdl/src/main/herd/scripts/installDBForHerd.sh @@ -200,22 +200,14 @@ if [ "${herdRollingDeployment}" = "true" ] ; then echo "Requested version: ${newVersion}" fi -while [ ${initialVersion} -lt ${newVersion} ] +boundary=`expr ${newVersion} - 1` + +for i in $(seq ${initialVersion} ${boundary}) do - if [ ${initialVersion} -lt 10 ]; then - next=$((10#${initialVersion}+1)) - pre="0${initialVersion}" - if [ ${initialVersion} -eq 9 ]; then - post="${next}" - else - post="0${next}" - fi - else - pre="${initialVersion}" - next=$((10#${initialVersion}+1)) - post="${next}" - fi - ((initialVersion++)) + + printf -v pre "%03d" $i + var=`expr $i + 1` + printf -v post "%03d" $var # Apply incremental upgrade scripts to the Herd DB echo "Running incremental DB script: ${deployLocation}/sql/herd.postgres.0.${pre}.0-to-0.${post}.0.upgrade.sql" @@ -261,13 +253,14 @@ if [ "${herdRollingDeployment}" = "false" ] ; then # Add namespace authorization admin permissions for the app-admin user execute_cmd "psql --set ON_ERROR_STOP=on --host ${herdDatabaseHost} --port 5432 -c \"INSERT INTO dmrowner.user_tbl VALUES ('${admin_user_url}', 'USER', 'ADMIN', current_timestamp, current_timestamp, 'SYSTEM', 'SYSTEM', '${PGUSER}', 'Y', 'Y');\"" - # Add Herd namespace-auth SNS topic configuration - sns_arn=$(aws ssm get-parameter --name /app/MDL/${mdlInstanceName}/${environment}/Resources/SNS/UserNamespaceChgTopicArn --region ${region} --output text --query Parameter.Value) - echo "Inserting SNS config for user-namespace authorization status changes. SNS Arn: ${sns_arn}" - execute_cmd "sed -i \"s/{{SNS_TOPIC_ARN}}/${sns_arn}/g\" ${deployLocation}/sql/nsAuthSnsConfig.sql" - execute_cmd "sed -i \"s/{{ENVIRONMENT}}/${environment}/g\" ${deployLocation}/sql/nsAuthSnsConfig.sql" - execute_cmd "psql --set ON_ERROR_STOP=on --host ${herdDatabaseHost} --port 5432 -f ${deployLocation}/sql/nsAuthSnsConfig.sql" + if [ ${deployComponents} = "All" ] || [ ${deployComponents} == "BDSQL" ] ; then + sns_arn=$(aws ssm get-parameter --name /app/MDL/${mdlInstanceName}/${environment}/Resources/SNS/UserNamespaceChgTopicArn --region ${region} --output text --query Parameter.Value) + echo "Inserting SNS config for user-namespace authorization status changes. SNS Arn: ${sns_arn}" + execute_cmd "sed -i \"s/{{SNS_TOPIC_ARN}}/${sns_arn}/g\" ${deployLocation}/sql/nsAuthSnsConfig.sql" + execute_cmd "sed -i \"s/{{ENVIRONMENT}}/${environment}/g\" ${deployLocation}/sql/nsAuthSnsConfig.sql" + execute_cmd "psql --set ON_ERROR_STOP=on --host ${herdDatabaseHost} --port 5432 -f ${deployLocation}/sql/nsAuthSnsConfig.sql" + fi # ensure that jms publishing is enabled execute_cmd "psql --set ON_ERROR_STOP=on --host ${herdDatabaseHost} --port 5432 -c \"DELETE FROM cnfgn WHERE cnfgn_key_nm = 'jms.listener.enabled';\"" diff --git a/metastor/managedObjectLoader/pom.xml b/metastor/managedObjectLoader/pom.xml index fdfc73d5..cf269001 100644 --- a/metastor/managedObjectLoader/pom.xml +++ b/metastor/managedObjectLoader/pom.xml @@ -19,7 +19,7 @@ metastore org.finra.herd-mdl.metastore - 1.2.25 + 1.2.38 4.0.0 diff --git a/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/ClusterManager.java b/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/ClusterManager.java index e520fb14..e19129cd 100644 --- a/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/ClusterManager.java +++ b/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/ClusterManager.java @@ -24,21 +24,15 @@ import com.amazonaws.services.elasticmapreduce.model.ClusterState; import com.amazonaws.services.elasticmapreduce.model.DescribeClusterRequest; import com.amazonaws.services.elasticmapreduce.model.DescribeClusterResult; +import org.finra.herd.metastore.managed.datamgmt.DataMgmtSvc; +import org.finra.herd.metastore.managed.util.JobProcessorConstants; import org.finra.herd.sdk.api.EmrApi; import org.finra.herd.sdk.invoker.ApiClient; -import org.finra.herd.sdk.invoker.ApiException; -import org.finra.herd.sdk.model.EmrCluster; -import org.finra.herd.sdk.model.EmrClusterCreateRequest; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.stereotype.Component; -import org.springframework.transaction.PlatformTransactionManager; -import org.springframework.transaction.TransactionDefinition; -import org.springframework.transaction.TransactionStatus; -import org.springframework.transaction.support.DefaultTransactionDefinition; import javax.annotation.PreDestroy; import javax.json.Json; @@ -81,7 +75,7 @@ public class ClusterManager implements InitializingBean { int singleObjPartitionThreshold = 50; @Value("${PARTITION_AGE_THRESHOLD_IN_HOURS}") - int ageThreshold = 10; //Hours + int ageThreshold = 10; //Minutes @Value("${MAX_CLUSTER}") int maxCluster = 10; @@ -102,9 +96,17 @@ public class ClusterManager implements InitializingBean { @Value("${AGS}") private String ags; + @Autowired + boolean analyzeStats; + @Autowired NotificationSender notificationSender; + @Autowired + protected DataMgmtSvc dataMgmtSvc; + + + int createClusterRetryCounter = 0; Set errors = new HashSet( 5 ); @@ -114,11 +116,19 @@ public class ClusterManager implements InitializingBean { public static final String FIND_JOB_QUERY = "SELECT n.*, CASE WHEN l.count is null THEN 0 ELSE l.count END as c FROM (DM_NOTIFICATION n left outer join " + "(SELECT NOTIFICATION_ID, group_concat(success) as success, count(*) as count from METASTOR_PROCESSING_LOG " + "group by NOTIFICATION_ID) l on l.NOTIFICATION_ID=n.ID) " + - "where WF_TYPE != 3 and ( l.success is null or (l.success like '%N' and l.count> result = template.queryForList(JOB_GROUP_QUERY, maxRetry); + List> result ; + if(analyzeStats) { + result=template.queryForList(JOB_GROUP_QUERY_STATS, maxRetry); + } + else { + result=template.queryForList(JOB_GROUP_QUERY, maxRetry); + } + int clusterNum = 0; if(result.size() > 1) { @@ -326,11 +343,10 @@ int calculateNumberOfClustersNeeded() { int partCount = ((Long) record.get("count")).intValue(); Date timeCreated = (Date) record.get("oldest"); - long age = (current - timeCreated.getTime()/1000) / 3600; if (partCount > singleObjPartitionThreshold) { clusterNum++; - } else if ((age > ageThreshold) && (!ageIncrement)) { + } else if (ageCheck(current, timeCreated)&& (!ageIncrement)) { clusterNum++; ageIncrement = true; } @@ -351,6 +367,11 @@ int calculateNumberOfClustersNeeded() { } + protected boolean ageCheck( long current, Date timeCreated ){ + long age = (current - timeCreated.getTime()/1000) / 3600; + return (age > ageThreshold); + } + public void clusterAutoScale() { logger.info("Auto Scale Job started"); @@ -437,7 +458,7 @@ private void createAdditionalCluster( String proposedName, EmrApi emrApi, List partitionValues; - Map partitionsKeyValue; - - ObjectDefinition objectDefinition; - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - JobDefinition that = (JobDefinition) o; - return id == that.id && - wfType == that.wfType && - numOfRetry == that.numOfRetry && - Objects.equals(objectDefinition, that.objectDefinition) && - Objects.equals(partitionValue, that.partitionValue) && - Objects.equals(correlation, that.correlation) && - Objects.equals(executionID, that.executionID) && - Objects.equals(clusterName, that.clusterName) && - Objects.equals(partitionKey, that.partitionKey); - } - - @Override - public int hashCode() { - return Objects.hash(id, objectDefinition, partitionValue, correlation, executionID, clusterName, wfType, numOfRetry, partitionKey); - } - - - public JobDefinition(long id, String nameSpace, String objectName, String usageCode, - String fileType, String partitionValue, String correlation, - String executionID, String partitionKey) { - this.id = id; - objectDefinition=new ObjectDefinition(); - objectDefinition.nameSpace = nameSpace; - objectDefinition.objectName = objectName; - objectDefinition.usageCode = usageCode; - objectDefinition.fileType = fileType; - this.partitionValue = partitionValue; - this.correlation = correlation; - this.executionID = executionID; - this.partitionKey = partitionKey; - } - - public JobDefinition() { - } - - public String getPartitionsSpecForStats(){ - StringJoiner partitionStats = new StringJoiner( "," , "(", ")"); - partitionsKeyValue.forEach( (k, v) -> { - partitionStats.add( String.format( "`%s`='%s'", k, v ) ); - } ); - - return partitionStats.toString(); + String partitionValue; + List partitionValues = Lists.newArrayList(); + Map partitionsKeyValue = Maps.newLinkedHashMap(); + + ObjectDefinition objectDefinition; + + @Override + public boolean equals( Object o ) { + if ( this == o ) return true; + if ( o == null || getClass() != o.getClass() ) return false; + JobDefinition that = (JobDefinition) o; + return id == that.id && + wfType == that.wfType && + numOfRetry == that.numOfRetry && + Objects.equals( objectDefinition, that.objectDefinition ) && + Objects.equals( partitionValue, that.partitionValue ) && + Objects.equals( correlation, that.correlation ) && + Objects.equals( executionID, that.executionID ) && + Objects.equals( clusterName, that.clusterName ) && + Objects.equals( partitionKey, that.partitionKey ); } - @Slf4j - public static class ObjectDefinitionMapper implements RowMapper { + @Override + public int hashCode() { + return Objects.hash( id, objectDefinition, partitionValue, correlation, executionID, clusterName, wfType, numOfRetry, partitionKey ); + } + + + public JobDefinition( long id, String nameSpace, String objectName, String usageCode, + String fileType, String partitionValue, String correlation, + String executionID, String partitionKey ) { + this.id = id; + objectDefinition = new ObjectDefinition(); + objectDefinition.nameSpace = nameSpace; + objectDefinition.objectName = objectName; + objectDefinition.usageCode = usageCode; + objectDefinition.fileType = fileType; + this.partitionValue = partitionValue; + this.correlation = correlation; + this.executionID = executionID; + this.partitionKey = partitionKey; + } + + public JobDefinition() { + } + + public String partitionSpecForStats() { + if ( Objects.isNull( partitionValue ) || partitionValue.isEmpty() ) { //Singleton + return String.format( "`%s`", partitionKey ); + } else if ( MetastoreUtil.isNonPartitionedSingleton( partitionKey ) ) { + return JobProcessorConstants.NON_PARTITIONED_SINGLETON_VALUE; + } else if ( partitionKey.contains( COMMA ) && partitionValue.contains( COMMA ) ) { + StringJoiner partitionSpec = new StringJoiner( COMMA ); + List statsPartitionKeys = Lists.newArrayList( partitionKey.split( COMMA ) ); + List statsValues = Lists.newArrayList( partitionValue.split( COMMA ) ); + IntStream.range( 0, statsPartitionKeys.size() ) + .forEach( i -> { + partitionSpec.add( String.format( "`%s`='%s'", statsPartitionKeys.get( i ), statsValues.get( i ) ) ); + } ); + return partitionSpec.toString(); + } + return String.format( "`%s`='%s'", partitionKey, partitionValue ); + + } - @Override - public JobDefinition mapRow(ResultSet resultSet, int i) throws SQLException { - JobDefinition jobDefinition = new JobDefinition(); + public String partitionKeysForStats() { + if ( partitionsKeyValue.isEmpty() ) { + return String.format( "%s", partitionKey ); + } - jobDefinition.id = resultSet.getLong("ID"); - jobDefinition.objectDefinition=new ObjectDefinition(); - jobDefinition.objectDefinition.nameSpace=resultSet.getString("NAMESPACE"); - jobDefinition.objectDefinition.objectName=resultSet.getString(("OBJECT_DEF_NAME")); - jobDefinition.objectDefinition.usageCode=resultSet.getString("USAGE_CODE"); - jobDefinition.objectDefinition.fileType=resultSet.getString("FILE_TYPE"); + return partitionsKeyValue.keySet().stream().collect( Collectors.joining( COMMA ) ); + } + + public String partitionValuesForStats( String value ) { + if ( !partitionsKeyValue.isEmpty() ) { + return partitionsKeyValue.values().stream().collect( Collectors.joining( COMMA ) ); + } else if ( MetastoreUtil.isPartitionedSingleton( wfType, partitionKey ) ) { + return ""; + } - jobDefinition.partitionValue=resultSet.getString("PARTITION_VALUES"); + return value; + } + + @Slf4j + public static class ObjectDefinitionMapper implements RowMapper { + + @Override + public JobDefinition mapRow( ResultSet resultSet, int i ) throws SQLException { + JobDefinition jobDefinition = new JobDefinition(); + + jobDefinition.id = resultSet.getLong( "ID" ); + jobDefinition.objectDefinition = new ObjectDefinition(); + jobDefinition.objectDefinition.nameSpace = resultSet.getString( "NAMESPACE" ); + jobDefinition.objectDefinition.objectName = resultSet.getString( ("OBJECT_DEF_NAME") ); + jobDefinition.objectDefinition.usageCode = resultSet.getString( "USAGE_CODE" ); + jobDefinition.objectDefinition.fileType = resultSet.getString( "FILE_TYPE" ); + jobDefinition.objectDefinition.wfType = resultSet.getInt( "WF_TYPE" ); + + jobDefinition.partitionValue = resultSet.getString( "PARTITION_VALUES" ); if ( jobDefinition.getPartitionValue().contains( SUB_PARTITION_VAL_SEPARATOR ) ) { jobDefinition.subPartitionLevelProcessing = true; - jobDefinition.partitionValues = Lists.newArrayList( jobDefinition.getPartitionValue().split( SUB_PARTITION_VAL_SEPARATOR )); + jobDefinition.partitionValues = Lists.newArrayList( jobDefinition.getPartitionValue().split( SUB_PARTITION_VAL_SEPARATOR ) ); } - // Execution ID not being used, other than filenaming .hql files it just have to unique - jobDefinition.executionID=resultSet.getString("ID"); - jobDefinition.partitionKey=resultSet.getString("PARTITION_KEY"); - jobDefinition.wfType=resultSet.getInt("WF_TYPE"); - jobDefinition.numOfRetry=resultSet.getInt("c"); - jobDefinition.clusterName=resultSet.getString("CLUSTER_NAME"); - - jobDefinition.correlation = resultSet.getString( "CORRELATION_DATA" ); - jobDefinition.actualObjectName = getActualObjectName( jobDefinition ); - jobDefinition.tableName = identifyTableName( jobDefinition ); - - return jobDefinition; - } - - private String identifyTableName(JobDefinition jobDef){ - return new StringJoiner( "_" ) - .add( identifyObjectName(jobDef)) - .add( jobDef.getObjectDefinition().usageCode ) - .add( jobDef.getObjectDefinition().fileType ) + // Execution ID not being used, other than filenaming .hql files it just have to unique + jobDefinition.executionID = resultSet.getString( "ID" ); + jobDefinition.partitionKey = resultSet.getString( "PARTITION_KEY" ); + jobDefinition.wfType = resultSet.getInt( "WF_TYPE" ); + jobDefinition.numOfRetry = resultSet.getInt( "c" ); + jobDefinition.clusterName = resultSet.getString( "CLUSTER_NAME" ); + + jobDefinition.correlation = resultSet.getString( "CORRELATION_DATA" ); + jobDefinition.actualObjectName = getActualObjectName( jobDefinition ); + jobDefinition.tableName = identifyTableName( jobDefinition ); + + return jobDefinition; + } + + private String identifyTableName( JobDefinition jobDef ) { + return new StringJoiner( UNDERSCORE ) + .add( identifyObjectName( jobDef ) ) + .add( jobDef.getObjectDefinition().getUsageCode() ) + .add( jobDef.getObjectDefinition().getFileType() ) .toString() - .replaceAll("\\.", "_") - .replaceAll(" ","_") - .replaceAll("-","_"); + .replaceAll( "\\.", UNDERSCORE ) + .replaceAll( " ", UNDERSCORE ) + .replaceAll( "-", UNDERSCORE ); } private String identifyObjectName( JobDefinition jobDef ) { String originalObjectName = "original_object_name"; - if ( !StringUtils.isEmpty(jobDef.getCorrelation()) && jobDef.getCorrelation().contains( originalObjectName ) ) { + if ( !StringUtils.isEmpty( jobDef.getCorrelation() ) && jobDef.getCorrelation().contains( originalObjectName ) ) { return Json.createReader( new StringReader( jobDef.getCorrelation() ) ) .readObject().getJsonObject( "businessObject" ) .getString( originalObjectName ); } - return getActualObjectName(jobDef); + return getActualObjectName( jobDef ); + } + + public List getDatesBetweenRanges( + LocalDate startDate, LocalDate endDate ) { + + long numOfDaysBetween = ChronoUnit.DAYS.between( startDate, endDate ); + return IntStream.iterate( 0, i -> i + 1 ) + .limit( numOfDaysBetween ) + .mapToObj( i -> startDate.plusDays( i ) ) + .collect( Collectors.toList() ); } private String getActualObjectName( JobDefinition jobDef ) { try { - if (!StringUtils.isEmpty(jobDef.getCorrelation()) && !jobDef.getCorrelation().equals("null")) { - JsonReader reader = Json.createReader(new StringReader(jobDef.getCorrelation())); + if ( !StringUtils.isEmpty( jobDef.getCorrelation() ) && !jobDef.getCorrelation().equals( "null" ) ) { + JsonReader reader = Json.createReader( new StringReader( jobDef.getCorrelation() ) ); JsonObject object = reader.readObject(); - if (object.containsKey("businessObject")) { - JsonObject bo = object.getJsonObject("businessObject"); - if (bo.containsKey("late_reporting_for")) { - String objName = bo.getString("late_reporting_for"); + if ( object.containsKey( "businessObject" ) ) { + JsonObject bo = object.getJsonObject( "businessObject" ); + if ( bo.containsKey( "late_reporting_for" ) ) { + String objName = bo.getString( "late_reporting_for" ); return objName; } } } - } catch (Exception ex) { - log.warn("Error parsing correlation:"+jobDef.getCorrelation()); + } catch ( Exception ex ) { + log.warn( "Error parsing correlation:" + jobDef.getCorrelation() ); } return jobDef.getObjectDefinition().getObjectName(); } - } + } } diff --git a/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/JobPicker.java b/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/JobPicker.java index 8b9511f1..1c06e1e5 100644 --- a/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/JobPicker.java +++ b/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/JobPicker.java @@ -31,28 +31,41 @@ public class JobPicker { " FROM (DM_NOTIFICATION n left outer join " + "(SELECT NOTIFICATION_ID, group_concat(success) as success, count(*) as count, max(DATE_PROCESSED) as last_process from METASTOR_PROCESSING_LOG " + "group by NOTIFICATION_ID) l on l.NOTIFICATION_ID=n.ID) left outer join METASTOR_WORKFLOW m on WF_TYPE=m.WORKFLOW_ID " + - "where WF_TYPE != 3 and ( l.success is null or (l.success not like '%Y' and l.count? )) and NOT EXISTS (select * from " + + "where WF_TYPE NOT IN (3,5) and ( l.success is null or (l.success not like '%Y' and l.count? )) and NOT EXISTS (select * from " + "METASTOR_OBJECT_LOCKS lc where lc.NAMESPACE=n.NAMESPACE and lc.OBJ_NAME=n.OBJECT_DEF_NAME and lc.USAGE_CODE=n.USAGE_CODE and" + - " lc.FILE_TYPE=n.FILE_TYPE and CLUSTER_ID !=?) ORDER BY PRIORITY ASC, PARTITION_VALUES DESC"; + " lc.FILE_TYPE=n.FILE_TYPE and CLUSTER_ID !=? and lc.WF_TYPE NOT IN(3,5)) ORDER BY PRIORITY ASC, PARTITION_VALUES DESC"; - static final String DELETE_EXPIRED_LOCKS = "delete from METASTOR_OBJECT_LOCKS where EXPIRATION_DT < now()"; + public static final String FIND_UNLOCKED_STATS_JOB_QUERY = "SELECT n.*, CASE WHEN l.count is null THEN 0 ELSE l.count END as c, PRIORITY" + + " FROM (DM_NOTIFICATION n left outer join " + + "(SELECT NOTIFICATION_ID, group_concat(success) as success, count(*) as count, max(DATE_PROCESSED) as last_process from METASTOR_PROCESSING_LOG " + + "group by NOTIFICATION_ID) l on l.NOTIFICATION_ID=n.ID) left outer join METASTOR_WORKFLOW m on WF_TYPE=m.WORKFLOW_ID " + + "where WF_TYPE = 5 and ( l.success is null or (l.success not like '%Y' and l.count? )) and NOT EXISTS (select * from " + + "METASTOR_OBJECT_LOCKS lc where lc.NAMESPACE=n.NAMESPACE and lc.OBJ_NAME=n.OBJECT_DEF_NAME and lc.USAGE_CODE=n.USAGE_CODE and" + + " lc.FILE_TYPE=n.FILE_TYPE and CLUSTER_ID !=? and lc.WF_TYPE = 5 ) ORDER BY PRIORITY ASC, PARTITION_VALUES DESC"; + + + + static final String DELETE_EXPIRED_LOCKS = "delete from METASTOR_OBJECT_LOCKS where EXPIRATION_DT < now()"; static final String LOCK_QUERY = "insert ignore into METASTOR_OBJECT_LOCKS (NAMESPACE,\n" + "OBJ_NAME, USAGE_CODE,\n" + "FILE_TYPE,\n" + "CLUSTER_ID,\n" + "WORKER_ID,\n" + - "EXPIRATION_DT) VALUES (?,?,?,?,?,?, TIMESTAMPADD(MINUTE, 5, now()));"; + "WF_TYPE,\n" + + "EXPIRATION_DT) VALUES (?,?,?,?,?,?,?, TIMESTAMPADD(MINUTE, 5, now()));"; static final String FIND_LOCK = "SELECT * FROM METASTOR_OBJECT_LOCKS WHERE NAMESPACE=? and OBJ_NAME=? and USAGE_CODE=? and " + - "FILE_TYPE=? and CLUSTER_ID=? and WORKER_ID=?"; + "FILE_TYPE=? and CLUSTER_ID=? and WORKER_ID=? and WF_TYPE=?"; static final String UPDATE_LOCK_EXPIRATION = "UPDATE METASTOR_OBJECT_LOCKS SET EXPIRATION_DT=TIMESTAMPADD(MINUTE, 5, now()) " + - "WHERE NAMESPACE=? and OBJ_NAME=? and USAGE_CODE=? and FILE_TYPE=? and CLUSTER_ID=? and WORKER_ID=?"; + "WHERE NAMESPACE=? and OBJ_NAME=? and USAGE_CODE=? and FILE_TYPE=? and CLUSTER_ID=? and WORKER_ID=? and WF_TYPE=?"; static final String UNLOCK = "delete from METASTOR_OBJECT_LOCKS where CLUSTER_ID=? and WORKER_ID=?"; + static final String DELETE_NOT_PROCESSING_NOTIFICATIONS = "DELETE FROM DM_NOTIFICATION WHERE WF_TYPE IN (3, 33)"; + @Autowired JdbcTemplate template; @@ -63,14 +76,28 @@ public class JobPicker { @Value( "${RETRY_INTERVAL}" ) int jobRetryIntervalInSecs; + + @Autowired + boolean analyzeStats; + List findJob( String clusterID, String workerID ) { List jobs = new ArrayList(); try { + deleteNotProcessingNotifications(); deleteExpiredLocks(); - List result = template.query( FIND_UNLOCKED_JOB_QUERY, new Object[]{maxRetry, - jobRetryIntervalInSecs, clusterID}, - new JobDefinition.ObjectDefinitionMapper() ); + List result ; + logger.info("Get Stats: " + analyzeStats); + if ( analyzeStats ) { + logger.info( "Running for stats" ); + result = template.query( + FIND_UNLOCKED_STATS_JOB_QUERY, new Object[]{maxRetry, jobRetryIntervalInSecs, clusterID}, + new JobDefinition.ObjectDefinitionMapper() ); + } else { + result = template.query( + FIND_UNLOCKED_JOB_QUERY, new Object[]{maxRetry, jobRetryIntervalInSecs, clusterID}, + new JobDefinition.ObjectDefinitionMapper() ); + } //Locking //1. Delete expired locks ObjectDefinition lockedJd = null; @@ -100,6 +127,16 @@ List findJob( String clusterID, String workerID ) { return jobs; } + /** + * This deletes the notifications, which are marked as + * 3 - object notification disabled + * 33 - Business data object status marking notifications which will be excluded from Metastore processing due intermediate processing status + * */ + private void deleteNotProcessingNotifications() { + int numberOfRowsDeleted = template.update( DELETE_NOT_PROCESSING_NOTIFICATIONS ); + logger.info( "Number of Not processing Notifications Deleted = " + numberOfRowsDeleted ); + } + void deleteExpiredLocks() { int numberOfRowsDeleted = template.update( DELETE_EXPIRED_LOCKS ); logger.info( "Number of Locks Deleted = " + numberOfRowsDeleted ); @@ -110,12 +147,12 @@ public boolean lockObj( JobDefinition jd, String clusterID, String threadID ) { ObjectDefinition od = jd.getObjectDefinition(); if ( template.queryForList( FIND_LOCK, od.getNameSpace(), objectName, - od.getUsageCode(), od.getFileType(), clusterID, threadID ).size() > 0 ) { + od.getUsageCode(), od.getFileType(), clusterID, threadID,jd.getWfType() ).size() > 0 ) { return extendLock( jd, clusterID, threadID ); } else { unlockWorker( clusterID, threadID ); int updated = template.update( LOCK_QUERY, od.getNameSpace(), objectName, - od.getUsageCode(), od.getFileType(), clusterID, threadID ); + od.getUsageCode(), od.getFileType(), clusterID, threadID, jd.getWfType() ); return updated == 1; } } @@ -130,7 +167,7 @@ public boolean extendLock( JobDefinition jd, String clusterID, String workerID ) String objectName = jd.getActualObjectName(); int updated = template.update( UPDATE_LOCK_EXPIRATION, od.getNameSpace(), objectName, od.getUsageCode(), - od.getFileType(), clusterID, workerID ); + od.getFileType(), clusterID, workerID ,jd.getWfType()); return updated > 0; } diff --git a/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/NotificationSender.java b/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/NotificationSender.java index b9eeab7f..1653ca70 100644 --- a/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/NotificationSender.java +++ b/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/NotificationSender.java @@ -56,8 +56,8 @@ public class NotificationSender { @Value( "${email_host}" ) private String emailHost; - @Value("${AGS}") - private String ags; + @Value( "${AGS}" ) + protected String ags; PebbleEngine engine = new PebbleEngine.Builder().autoEscaping( false ).strictVariables( true ).build(); @@ -115,7 +115,7 @@ public void sendEmail( String msgBody, String subject ) { msg.addRecipient( Message.RecipientType.TO, new InternetAddress( mailingList ) ); - msg.setSubject( String.format( "%s-%s %s", ags, env, subject ) ); + msg.setSubject( getUpdatedSubject( subject ) ); msg.setDataHandler( new DataHandler( new ByteArrayDataSource( msgBody, "text/plain" ) ) ); javax.mail.Transport.send( msg ); @@ -124,6 +124,10 @@ public void sendEmail( String msgBody, String subject ) { } } + protected String getUpdatedSubject( String subject ) { + return String.format( "%s-%s %s", ags, env, subject ); + } + public void sendFormatChangeEmail( FormatChange change, int version, JobDefinition job, HiveTableSchema existing, HiveTableSchema newColumns ) { try { @@ -135,7 +139,7 @@ public void sendFormatChangeEmail( FormatChange change, int version, JobDefiniti } protected String getFormatChangeMsg( FormatChange change, int version, JobDefinition job, - HiveTableSchema existing, HiveTableSchema newColumns ) + HiveTableSchema existing, HiveTableSchema newColumns ) throws PebbleException, IOException { PebbleTemplate template = engine.getTemplate( "templates/formatChangeNotificationTemplate.txt" ); diff --git a/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/ObjectDefinition.java b/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/ObjectDefinition.java index b3218f56..7c7d24ad 100644 --- a/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/ObjectDefinition.java +++ b/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/ObjectDefinition.java @@ -27,6 +27,7 @@ public class ObjectDefinition { String objectName; String usageCode; String fileType; + int wfType; public String getDbName() { diff --git a/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/ObjectProcessor.java b/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/ObjectProcessor.java index c5c3f7e1..a215d895 100644 --- a/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/ObjectProcessor.java +++ b/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/ObjectProcessor.java @@ -59,7 +59,7 @@ public class ObjectProcessor { private BackLoadObjectProcessor backLoadObjectProcessor; @Autowired - NotificationSender notificationSender; + protected NotificationSender notificationSender; @Autowired ClusterManager clusterManager; diff --git a/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/conf/HerdMetastoreConfig.java b/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/conf/HerdMetastoreConfig.java index 11fb5150..8bc011c4 100644 --- a/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/conf/HerdMetastoreConfig.java +++ b/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/conf/HerdMetastoreConfig.java @@ -12,7 +12,7 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -**/ + **/ package org.finra.herd.metastore.managed.conf; import lombok.extern.slf4j.Slf4j; @@ -44,112 +44,122 @@ @Slf4j @Configuration public class HerdMetastoreConfig { - public static final String homeDir = System.getenv( "HOME" ); - public static final String DM_PASS_FILE_PATH = String.format( "%s/dmCreds/dmPass.base64", homeDir ); - public static final String CRED_FILE_PATH = "cred.file.path"; + public static final String homeDir = System.getenv( "HOME" ); + public static final String DM_PASS_FILE_PATH = String.format( "%s/dmCreds/dmPass.base64", homeDir ); + public static final String CRED_FILE_PATH = "cred.file.path"; + public static final String ANALYZE_STATS = "analyze.stats"; - @Value( "${MYSQL_URL}" ) - protected String dburl; - @Value( "${MYSQL_USR}" ) + @Value( "${MYSQL_URL}" ) + protected String dburl; + + @Value( "${MYSQL_USR}" ) protected String dbUser; - @Value( "${MYSQL_PASS}" ) + @Value( "${MYSQL_PASS}" ) protected String dbPass; - @Value( "${DM_URL}" ) + @Value( "${DM_URL}" ) protected String dmUrl; - @Value( "${JDBC_VALIDATE_QUERY}" ) + @Value( "${JDBC_VALIDATE_QUERY}" ) protected String validationQuery; - @Autowired + + @Autowired protected Environment environment; - @Autowired + @Autowired protected Path credentialFilePath; - @Bean(destroyMethod = "") - public DataSource getDataSource() { - BasicDataSource dataSource = new BasicDataSource(); - dataSource.setUrl( dburl ); - dataSource.setUsername( dbUser ); - dataSource.setPassword( dbPass ); - dataSource.setInitialSize( 2 ); - dataSource.setValidationQuery( validationQuery ); - - return dataSource; - } - - @Bean(name = "template") - public JdbcTemplate getJdbcTemplate() { - return new JdbcTemplate( getDataSource() ); - } - - @Bean - public Path credentialFilePath() { - return Paths.get( DM_PASS_FILE_PATH ); - } - - /** - * Return herd ApiClient used to make calls to Herd Api's - * - * @return the Herd ApiClient {@link ApiClient} - */ - @Bean - public ApiClient getDMApiClient() { - ApiClient apiClient = new ApiClient(); - apiClient.setBasePath( dmUrl ); - apiClient.addDefaultHeader( "Authorization", String.format( "Basic %s", getCredentials() ) ); - - return apiClient; - } - - /** - * Reads Credentials from credential file - * - * @return - */ - public String getCredentials() { - Path path = credentialFilePath; - try { - - String cmdParamCredFilePath = environment.getProperty( CRED_FILE_PATH ); - - // If credential file passed as parameter to the object processor script, use that - log.info( "Credential file Passed as parameter: {}", cmdParamCredFilePath ); - path = Paths.get( cmdParamCredFilePath ); - - - return Files.lines( path ).findFirst().get(); - } catch ( IOException e ) { - throw new RuntimeException( "Could not read Herd Credentials from: " + path, e ); - } - } - - /** - * Returns Herd's Business Object Data Api - * - * @return BusinessObjectDataApi {@link BusinessObjectDataApi} - */ - @Bean - public BusinessObjectDataApi businessObjectDataApi() { - return new BusinessObjectDataApi( getDMApiClient() ); - } - - @Bean - public String homeDir(){ - return homeDir; - } - - @Bean (name = "hiveJdbcTemplate") - public JdbcTemplate hiveJdbcTemplate() { - SimpleDriverDataSource dataSource = new SimpleDriverDataSource( - new HiveDriver() - , HIVE_URL - , HIVE_USER - , HIVE_PASSWORD - ); - return new JdbcTemplate( dataSource ); - } + @Bean(destroyMethod = "") + public DataSource getDataSource() { + BasicDataSource dataSource = new BasicDataSource(); + dataSource.setUrl( dburl ); + dataSource.setUsername( dbUser ); + dataSource.setPassword( dbPass ); + dataSource.setInitialSize( 2 ); + dataSource.setValidationQuery( validationQuery ); + + return dataSource; + } + + @Bean(name = "template") + public JdbcTemplate getJdbcTemplate() { + return new JdbcTemplate( getDataSource() ); + } + + @Bean + public Path credentialFilePath() { + return Paths.get( DM_PASS_FILE_PATH ); + } + + /** + * Return herd ApiClient used to make calls to Herd Api's + * + * @return the Herd ApiClient {@link ApiClient} + */ + @Bean + public ApiClient getDMApiClient() { + ApiClient apiClient = new ApiClient(); + apiClient.setBasePath( dmUrl ); + apiClient.addDefaultHeader( "Authorization", String.format( "Basic %s", getCredentials() ) ); + + return apiClient; + } + + /** + * Reads Credentials from credential file + * + * @return + */ + public String getCredentials() { + Path path = credentialFilePath; + try { + + String cmdParamCredFilePath = environment.getProperty( CRED_FILE_PATH ); + + // If credential file passed as parameter to the object processor script, use that + log.info( "Credential file Passed as parameter: {}", cmdParamCredFilePath ); + path = Paths.get( cmdParamCredFilePath ); + + + return Files.lines( path ).findFirst().get(); + } catch ( IOException e ) { + throw new RuntimeException( "Could not read Herd Credentials from: " + path, e ); + } + } + + /** + * Returns Herd's Business Object Data Api + * + * @return BusinessObjectDataApi {@link BusinessObjectDataApi} + */ + @Bean + public BusinessObjectDataApi businessObjectDataApi() { + return new BusinessObjectDataApi( getDMApiClient() ); + } + + @Bean + public String homeDir(){ + return homeDir; + } + + @Bean + public boolean analyzeStats() { + String stats = environment.getProperty(ANALYZE_STATS); + log.info("Analyze Stats from CMD: {}", stats); + return "true".equalsIgnoreCase(stats); + } + + @Bean (name = "hiveJdbcTemplate") + public JdbcTemplate hiveJdbcTemplate() { + SimpleDriverDataSource dataSource = new SimpleDriverDataSource( + new HiveDriver() + , HIVE_URL + , HIVE_USER + , HIVE_PASSWORD + ); + return new JdbcTemplate( dataSource ); + } } diff --git a/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/datamgmt/DataMgmtSvc.java b/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/datamgmt/DataMgmtSvc.java index 8f49f60f..d4fb028f 100644 --- a/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/datamgmt/DataMgmtSvc.java +++ b/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/datamgmt/DataMgmtSvc.java @@ -12,17 +12,18 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -**/ + **/ package org.finra.herd.metastore.managed.datamgmt; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; import org.finra.herd.metastore.managed.JobDefinition; -import org.finra.herd.metastore.managed.ObjectProcessor; +import org.finra.herd.metastore.managed.util.MetastoreUtil; import org.finra.herd.sdk.api.BusinessObjectDataApi; import org.finra.herd.sdk.api.BusinessObjectDataNotificationRegistrationApi; import org.finra.herd.sdk.api.BusinessObjectFormatApi; +import org.finra.herd.sdk.api.EmrApi; import org.finra.herd.sdk.invoker.ApiClient; import org.finra.herd.sdk.invoker.ApiException; import org.finra.herd.sdk.model.*; @@ -46,177 +47,201 @@ public class DataMgmtSvc { @Value("${AGS}") private String ags; - @Autowired - ApiClient dmApiClient; - - @Autowired - BusinessObjectDataApi businessObjectDataApi; - - static { - javax.net.ssl.HttpsURLConnection.setDefaultHostnameVerifier( - ( hostname, sslSession ) -> true ); - } - - public String getTableSchema( org.finra.herd.metastore.managed.JobDefinition jd, boolean replaceColumn ) throws ApiException { - - BusinessObjectFormatApi businessObjectFormatApi = new BusinessObjectFormatApi( dmApiClient ); - - BusinessObjectFormatDdlRequest request = new BusinessObjectFormatDdlRequest(); - request.setBusinessObjectDefinitionName( jd.getActualObjectName() ); - request.setBusinessObjectFormatFileType( jd.getObjectDefinition().getFileType() ); - request.setBusinessObjectFormatUsage( jd.getObjectDefinition().getUsageCode() ); - - request.setNamespace( jd.getObjectDefinition().getNameSpace() ); - request.setIncludeDropTableStatement( false ); - request.setIncludeIfNotExistsOption( !replaceColumn ); - request.setOutputFormat( BusinessObjectFormatDdlRequest.OutputFormatEnum.HIVE_13_DDL ); - request.setReplaceColumns( replaceColumn ); - - request.setTableName( jd.getTableName() ); - - BusinessObjectFormatDdl ddl = businessObjectFormatApi.businessObjectFormatGenerateBusinessObjectFormatDdl( request ); - - return ddl.getDdl(); - } - - public BusinessObjectFormat getDMFormat( org.finra.herd.metastore.managed.JobDefinition jd ) throws ApiException { - - BusinessObjectFormatApi businessObjectFormatApi = new BusinessObjectFormatApi( dmApiClient ); - - BusinessObjectFormat format = businessObjectFormatApi.businessObjectFormatGetBusinessObjectFormat( jd.getObjectDefinition().getNameSpace(), - jd.getActualObjectName(), jd.getObjectDefinition().getUsageCode(), jd.getObjectDefinition().getFileType(), null ); - - return format; - } - - public BusinessObjectDataDdl getBusinessObjectDataDdl( org.finra.herd.metastore.managed.JobDefinition jd, List partitions ) throws ApiException { - BusinessObjectDataDdlRequest request = new BusinessObjectDataDdlRequest(); - - request.setIncludeDropTableStatement( false ); - request.setOutputFormat( BusinessObjectDataDdlRequest.OutputFormatEnum.HIVE_13_DDL ); - request.setBusinessObjectFormatUsage( jd.getObjectDefinition().getUsageCode() ); - request.setBusinessObjectFormatFileType( jd.getObjectDefinition().getFileType() ); - request.setBusinessObjectDefinitionName( jd.getObjectDefinition().getObjectName() ); - request.setAllowMissingData( true ); - request.setIncludeDropPartitions( true ); - request.setIncludeIfNotExistsOption( true ); - request.setTableName( jd.getTableName() ); - - List partitionValueFilters = Lists.newArrayList(); - - if ( jd.getWfType() == ObjectProcessor.WF_TYPE_SINGLETON && !jd.getPartitionKey().equalsIgnoreCase( "PARTITION" ) ) { - addPartitionedSingletonFilter( jd, partitionValueFilters ); - } else { - - log.info( "Partitions: {}", partitions ); - if ( jd.getWfType() == ObjectProcessor.WF_TYPE_SINGLETON && jd.getPartitionKey().equalsIgnoreCase( "PARTITION" ) ) { - addPartitionFilter( jd.getPartitionKey(), Lists.newArrayList( "none" ), partitionValueFilters ); - } else { - if ( jd.isSubPartitionLevelProcessing() ) { - addSubPartitionFilter( jd, partitionValueFilters); - } else { - addPartitionFilter( jd.getPartitionKey(), partitions, partitionValueFilters ); - } - } - } - - request.setPartitionValueFilter( null ); - request.setPartitionValueFilters( partitionValueFilters ); - request.setNamespace( jd.getObjectDefinition().getNameSpace() ); - - log.info( "Get BO DDL Request: \n{}", request.toString() ); - return businessObjectDataApi.businessObjectDataGenerateBusinessObjectDataDdl( request ); - } - - private void addPartitionFilter( String partitionKey, List partitions, List partitionValueFilters ) { - PartitionValueFilter filter = new PartitionValueFilter(); - filter.setPartitionKey( partitionKey ); - filter.setPartitionValues( partitions ); - partitionValueFilters.add( filter ); - } - - private void addPartitionedSingletonFilter( JobDefinition jd, List partitionValueFilters ) { - Calendar c = Calendar.getInstance(); - c.add( Calendar.DATE, 1 ); - String date = new SimpleDateFormat( "YYYY-MM-dd" ).format( c.getTime() ); - LatestBeforePartitionValue value = new LatestBeforePartitionValue(); - value.setPartitionValue( date ); - - PartitionValueFilter filter = new PartitionValueFilter(); - filter.setPartitionKey( jd.getPartitionKey() ); - filter.setLatestBeforePartitionValue( value ); - filter.setPartitionValues( null ); - filter.setLatestAfterPartitionValue( null ); - partitionValueFilters.add( filter ); - } - - /** - * To partition filter with sub partitions - * - * @param jd - * @param partitionValueFilters - * @throws ApiException - */ - private void addSubPartitionFilter( JobDefinition jd, List partitionValueFilters ) throws ApiException { - List partitionKeys = getDMFormat( jd ).getSchema().getPartitions(); - log.info( "Partition Keys {} for {}", partitionKeys, jd.getTableName() ); - Map partitionKeyValues = Maps.newLinkedHashMap(); - - IntStream.range( 0, jd.getPartitionValues().size() ) - .forEach( i -> { - String partitionKey = partitionKeys.get( i ).getName(); - String partitionValue = jd.getPartitionValues().get( i ); - log.info( "Partition Key: {}\t Value: {}", partitionKey, partitionValue ); - partitionKeyValues.put( partitionKey, partitionValue ); - - addPartitionFilter( partitionKey, Lists.newArrayList( partitionValue ), partitionValueFilters ); - } ); - - jd.setPartitionsKeyValue( partitionKeyValues ); - } - - public BusinessObjectFormatKeys getBOAllFormatVersions( org.finra.herd.metastore.managed.JobDefinition od, boolean latestBusinessObjectFormatVersion ) throws ApiException { - - return new BusinessObjectFormatApi( dmApiClient ) - .businessObjectFormatGetBusinessObjectFormats( - od.getObjectDefinition().getNameSpace() - , od.getObjectDefinition().getObjectName() - , latestBusinessObjectFormatVersion ); - } - - public BusinessObjectDataNotificationRegistrationKeys getBORegisteredNotification( org.finra.herd.metastore.managed.JobDefinition od ) throws ApiException { - return new BusinessObjectDataNotificationRegistrationApi( dmApiClient ) - .businessObjectDataNotificationRegistrationGetBusinessObjectDataNotificationRegistrationsByNotificationFilter( - od.getObjectDefinition().getNameSpace() - , od.getObjectDefinition().getObjectName() - , od.getObjectDefinition().getUsageCode() - , od.getObjectDefinition().getFileType() - - ); - } - - public BusinessObjectDataNotificationRegistration getBORegisteredNotificationDetails( String notificationName ) throws ApiException { - return new BusinessObjectDataNotificationRegistrationApi( dmApiClient ) - .businessObjectDataNotificationRegistrationGetBusinessObjectDataNotificationRegistration( ags, notificationName ); - } - - public BusinessObjectDataSearchResult searchBOData( JobDefinition jd, int pageNum, int pageSize, Boolean filterOnValidLatestVersions ) throws ApiException{ - // Create Search Key - BusinessObjectDataSearchKey boDataSearchKeyItem = new BusinessObjectDataSearchKey(); - boDataSearchKeyItem.setNamespace( jd.getObjectDefinition().getNameSpace() ); - boDataSearchKeyItem.setBusinessObjectDefinitionName( jd.getObjectDefinition().getObjectName()); - boDataSearchKeyItem.setBusinessObjectFormatUsage( jd.getObjectDefinition().getUsageCode() ); - boDataSearchKeyItem.setBusinessObjectFormatFileType( jd.getObjectDefinition().getFileType() ); - boDataSearchKeyItem.setFilterOnLatestValidVersion( filterOnValidLatestVersions ); - - - // Search BO Data - return businessObjectDataApi.businessObjectDataSearchBusinessObjectData( - new BusinessObjectDataSearchRequest() - .addBusinessObjectDataSearchFiltersItem( new BusinessObjectDataSearchFilter() - .addBusinessObjectDataSearchKeysItem( boDataSearchKeyItem ) - ) - , pageNum - , pageSize ); - } + @Value("${CLUSTER_DEF_NAME}") + String clusterDef; + + @Value("${CLUSTER_DEF_NAME_STATS}") + String clusterDefNameStats; + + @Autowired + ApiClient dmApiClient; + + + @Autowired + BusinessObjectDataApi businessObjectDataApi; + + static { + javax.net.ssl.HttpsURLConnection.setDefaultHostnameVerifier( + (hostname, sslSession) -> true); + } + + public String getTableSchema(org.finra.herd.metastore.managed.JobDefinition jd, boolean replaceColumn) throws ApiException { + + BusinessObjectFormatApi businessObjectFormatApi = new BusinessObjectFormatApi(dmApiClient); + + BusinessObjectFormatDdlRequest request = new BusinessObjectFormatDdlRequest(); + request.setBusinessObjectDefinitionName(jd.getActualObjectName()); + request.setBusinessObjectFormatFileType(jd.getObjectDefinition().getFileType()); + request.setBusinessObjectFormatUsage(jd.getObjectDefinition().getUsageCode()); + + request.setNamespace(jd.getObjectDefinition().getNameSpace()); + request.setIncludeDropTableStatement(false); + request.setIncludeIfNotExistsOption(!replaceColumn); + request.setOutputFormat(BusinessObjectFormatDdlRequest.OutputFormatEnum.HIVE_13_DDL); + request.setReplaceColumns(replaceColumn); + + request.setTableName(jd.getTableName()); + + BusinessObjectFormatDdl ddl = businessObjectFormatApi.businessObjectFormatGenerateBusinessObjectFormatDdl(request); + + return ddl.getDdl(); + } + + public BusinessObjectFormat getDMFormat(org.finra.herd.metastore.managed.JobDefinition jd) throws ApiException { + + BusinessObjectFormatApi businessObjectFormatApi = new BusinessObjectFormatApi(dmApiClient); + + BusinessObjectFormat format = businessObjectFormatApi.businessObjectFormatGetBusinessObjectFormat(jd.getObjectDefinition().getNameSpace(), + jd.getActualObjectName(), jd.getObjectDefinition().getUsageCode(), jd.getObjectDefinition().getFileType(), null); + + return format; + } + + public BusinessObjectDataDdl getBusinessObjectDataDdl(org.finra.herd.metastore.managed.JobDefinition jd, List partitions) throws ApiException { + BusinessObjectDataDdlRequest request = new BusinessObjectDataDdlRequest(); + + request.setIncludeDropTableStatement(false); + request.setOutputFormat(BusinessObjectDataDdlRequest.OutputFormatEnum.HIVE_13_DDL); + request.setBusinessObjectFormatUsage(jd.getObjectDefinition().getUsageCode()); + request.setBusinessObjectFormatFileType(jd.getObjectDefinition().getFileType()); + request.setBusinessObjectDefinitionName(jd.getObjectDefinition().getObjectName()); + request.setAllowMissingData(true); + request.setIncludeDropPartitions(true); + request.setIncludeIfNotExistsOption(true); + request.setTableName(jd.getTableName()); + + List partitionValueFilters = Lists.newArrayList(); + + log.info("PartitionKey: {} \t Partitions: {}", jd.getPartitionKey(), partitions); + if (MetastoreUtil.isPartitionedSingleton(jd.getWfType(), jd.getPartitionKey())) { + addPartitionedSingletonFilter(jd, partitionValueFilters); + } else { + + if (MetastoreUtil.isNonPartitionedSingleton(jd.getWfType(), jd.getPartitionKey())) { + addPartitionFilter(jd.getPartitionKey(), Lists.newArrayList("none"), partitionValueFilters); + } else { + if (jd.isSubPartitionLevelProcessing()) { + addSubPartitionFilter(jd, partitionValueFilters); + } else { + addPartitionFilter(jd.getPartitionKey(), partitions, partitionValueFilters); + } + } + } + + request.setPartitionValueFilter(null); + request.setPartitionValueFilters(partitionValueFilters); + request.setNamespace(jd.getObjectDefinition().getNameSpace()); + + log.info("Get BO DDL Request: \n{}", request.toString()); + return businessObjectDataApi.businessObjectDataGenerateBusinessObjectDataDdl(request); + } + + private void addPartitionFilter(String partitionKey, List partitions, List partitionValueFilters) { + PartitionValueFilter filter = new PartitionValueFilter(); + filter.setPartitionKey(partitionKey); + filter.setPartitionValues(partitions); + partitionValueFilters.add(filter); + } + + private void addPartitionedSingletonFilter(JobDefinition jd, List partitionValueFilters) { + Calendar c = Calendar.getInstance(); + c.add(Calendar.DATE, 1); + String date = new SimpleDateFormat("YYYY-MM-dd").format(c.getTime()); + LatestBeforePartitionValue value = new LatestBeforePartitionValue(); + value.setPartitionValue(date); + + PartitionValueFilter filter = new PartitionValueFilter(); + filter.setPartitionKey(jd.getPartitionKey()); + filter.setLatestBeforePartitionValue(value); + filter.setPartitionValues(null); + filter.setLatestAfterPartitionValue(null); + partitionValueFilters.add(filter); + } + + /** + * To partition filter with sub partitions + * + * @param jd + * @param partitionValueFilters + * @throws ApiException + */ + private void addSubPartitionFilter(JobDefinition jd, List partitionValueFilters) throws ApiException { + List partitionKeys = getDMFormat(jd).getSchema().getPartitions(); + log.info("Partition Keys {} for {}", partitionKeys, jd.getTableName()); + Map partitionKeyValues = Maps.newLinkedHashMap(); + + IntStream.range(0, jd.getPartitionValues().size()) + .forEach(i -> { + String partitionKey = partitionKeys.get(i).getName(); + String partitionValue = jd.getPartitionValues().get(i); + log.info("Partition Key: {}\t Value: {}", partitionKey, partitionValue); + partitionKeyValues.put(partitionKey, partitionValue); + + addPartitionFilter(partitionKey, Lists.newArrayList(partitionValue), partitionValueFilters); + }); + + jd.setPartitionsKeyValue(partitionKeyValues); + } + + public BusinessObjectFormatKeys getBOAllFormatVersions(org.finra.herd.metastore.managed.JobDefinition od, boolean latestBusinessObjectFormatVersion) throws ApiException { + + return new BusinessObjectFormatApi(dmApiClient) + .businessObjectFormatGetBusinessObjectFormats( + od.getObjectDefinition().getNameSpace() + , od.getObjectDefinition().getObjectName() + , latestBusinessObjectFormatVersion); + } + + public BusinessObjectDataNotificationRegistrationKeys getBORegisteredNotification(org.finra.herd.metastore.managed.JobDefinition od) throws ApiException { + return new BusinessObjectDataNotificationRegistrationApi(dmApiClient) + .businessObjectDataNotificationRegistrationGetBusinessObjectDataNotificationRegistrationsByNotificationFilter( + od.getObjectDefinition().getNameSpace() + , od.getObjectDefinition().getObjectName() + , od.getObjectDefinition().getUsageCode() + , od.getObjectDefinition().getFileType() + + ); + } + + public BusinessObjectDataNotificationRegistration getBORegisteredNotificationDetails(String notificationName) throws ApiException { + return new BusinessObjectDataNotificationRegistrationApi(dmApiClient) + .businessObjectDataNotificationRegistrationGetBusinessObjectDataNotificationRegistration(ags, notificationName); + } + + + public void createCluster( boolean startStatsCluster, String proposedName ) throws ApiException { + EmrApi emrApi = new EmrApi(dmApiClient); + EmrClusterCreateRequest request = new EmrClusterCreateRequest(); + request.setNamespace( ags ); + request.setDryRun(false); + + request.setEmrClusterDefinitionName(clusterDef); + if ( startStatsCluster ) { + request.setEmrClusterDefinitionName(clusterDefNameStats); + } + + request.setEmrClusterName( proposedName); + EmrCluster cluster = emrApi.eMRCreateEmrCluster(request); + log.info(cluster.toString()); + } + + public BusinessObjectDataSearchResult searchBOData(JobDefinition jd, int pageNum, int pageSize, Boolean filterOnValidLatestVersions) throws ApiException { + // Create Search Key + BusinessObjectDataSearchKey boDataSearchKeyItem = new BusinessObjectDataSearchKey(); + boDataSearchKeyItem.setNamespace(jd.getObjectDefinition().getNameSpace()); + boDataSearchKeyItem.setBusinessObjectDefinitionName(jd.getObjectDefinition().getObjectName()); + boDataSearchKeyItem.setBusinessObjectFormatUsage(jd.getObjectDefinition().getUsageCode()); + boDataSearchKeyItem.setBusinessObjectFormatFileType(jd.getObjectDefinition().getFileType()); + boDataSearchKeyItem.setFilterOnLatestValidVersion(filterOnValidLatestVersions); + + + // Search BO Data + return businessObjectDataApi.businessObjectDataSearchBusinessObjectData( + new BusinessObjectDataSearchRequest() + .addBusinessObjectDataSearchFiltersItem(new BusinessObjectDataSearchFilter() + .addBusinessObjectDataSearchKeysItem(boDataSearchKeyItem) + ) + , pageNum + , pageSize); + } } diff --git a/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/hive/HivePartition.java b/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/hive/HivePartition.java index f3d97f9e..6c6fc7c5 100644 --- a/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/hive/HivePartition.java +++ b/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/hive/HivePartition.java @@ -3,14 +3,19 @@ import com.google.common.base.Strings; import lombok.*; +import java.util.Arrays; +import java.util.StringJoiner; + +import static org.finra.herd.metastore.managed.util.JobProcessorConstants.*; + @Builder @Data -@EqualsAndHashCode ( of = {"partName"} ) +@EqualsAndHashCode( of = {"partName"} ) @ToString public class HivePartition { String location; - String partition; - String metastorePartName; + String partition; // Hive Partition information from DDL + String metastorePartName; // Hive Metastore PART_NAME column value from Metastore's PARTITIONS Table String partName; boolean exists; @@ -42,9 +47,21 @@ public HivePartitionBuilder metastorePartName( String metastorePartName ) { public HivePartitionBuilder constructPartName() { if ( !Strings.isNullOrEmpty( partition ) ) { - this.partName = partition.replaceAll( "[^a-zA-Z0-9,-=_]", "" ).toLowerCase().replace( ",","/" ) ; - }else if( !Strings.isNullOrEmpty( metastorePartName ) ) { - this.partName = metastorePartName.toLowerCase(); + StringJoiner partNameJoiner = new StringJoiner( FORWARD_SLASH ); + + Arrays.stream( partition.replaceAll( "[^a-zA-Z0-9,-=_]", "" ).split( COMMA ) ).forEach( s -> { + String[] partitionKeyValue = s.split( EQUALS ); + if(partitionKeyValue.length == 2) { + partNameJoiner.add( String.format( "%s=%s", partitionKeyValue[0].toLowerCase(), partitionKeyValue[1] ) ); + }else{ + partNameJoiner.add( s ); + } + } ); + + this.partName = partNameJoiner.toString(); + + } else if ( !Strings.isNullOrEmpty( metastorePartName ) ) { + this.partName = metastorePartName; } return this; } diff --git a/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/jobProcessor/BackLoadObjectProcessor.java b/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/jobProcessor/BackLoadObjectProcessor.java index fc62adc1..352a87d3 100644 --- a/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/jobProcessor/BackLoadObjectProcessor.java +++ b/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/jobProcessor/BackLoadObjectProcessor.java @@ -82,7 +82,7 @@ public boolean process( JobDefinition od, String clusterID, String workerID ) { identifyPartitionsAndBackLoad( od, jsi ); - addGatherStatsJob( jsi ); +// addGatherStatsJob( jsi ); } catch ( Exception e ) { log.error( "Problem encountered in Back loading processor: {}", e.getMessage(), e ); @@ -98,6 +98,12 @@ private void identifyPartitionsAndBackLoad( JobDefinition od, JobSubmitterInfo j Map> partitions = partitionsAsMap( od, jsi ); log.info( "Total Available Partitions to load: {}", partitions.size() ); + if(partitions.isEmpty()){ + String messageBody = String.format("NO AVAILABLE PARTITIONS TO BACK-LOAD FOR: %s", od.toString()); + String messageSubject = "No Partitions avaiable to backload"; + notificationSender.sendNotificationEmail(messageBody, messageSubject, od ); + } + TreeSet orderedPartitions = Sets.newTreeSet(); List> chunkedPartitions = Lists.newArrayList(); AtomicInteger partitionCounter = new AtomicInteger( 0 ); @@ -156,7 +162,7 @@ private void addGatherStatsJob( JobSubmitterInfo jsi ) { log.info( "Adding gather Stats job" ); DMNotification dmNotification = buildDMNotification( jsi ); dmNotification.setWorkflowType( ObjectProcessor.WF_TYPE_MANAGED_STATS ); - dmNotification.setExecutionId( "stats" ); + dmNotification.setExecutionId( "SUBMITTED_BY_BACKLOADING" ); dmNotification.setPartitionKey( quotedPartitionKeys( jsi.getTableSchema() ) ); dmNotification.setPartitionValue( "" ); // partition values not required for gather stats job as it runs for all partitions log.info( "Herd Notification DB request: \n{}", dmNotification ); diff --git a/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/jobProcessor/HiveHqlGenerator.java b/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/jobProcessor/HiveHqlGenerator.java index 21de5eb7..27868b8c 100644 --- a/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/jobProcessor/HiveHqlGenerator.java +++ b/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/jobProcessor/HiveHqlGenerator.java @@ -24,12 +24,17 @@ import org.finra.herd.metastore.managed.ObjectProcessor; import org.finra.herd.metastore.managed.datamgmt.DataMgmtSvc; import org.finra.herd.metastore.managed.hive.*; +import org.finra.herd.metastore.managed.jobProcessor.dao.JobProcessorDAO; +import org.finra.herd.metastore.managed.util.JobProcessorConstants; import org.finra.herd.metastore.managed.util.MetastoreUtil; import org.finra.herd.sdk.invoker.ApiException; import org.finra.herd.sdk.model.BusinessObjectDataDdl; import org.finra.herd.sdk.model.BusinessObjectFormat; +import org.finra.herd.sdk.model.Schema; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import org.finra.herd.metastore.managed.jobProcessor.dao.DMNotification; + import java.io.IOException; import java.nio.file.Files; @@ -37,7 +42,9 @@ import java.nio.file.Paths; import java.sql.SQLException; import java.util.List; +import java.util.Set; import java.util.StringJoiner; +import java.util.stream.Collectors; import static java.nio.file.StandardOpenOption.APPEND; import static java.nio.file.StandardOpenOption.CREATE; @@ -46,6 +53,8 @@ @Slf4j public class HiveHqlGenerator { + public static final String SUBMITTED_BY_JOB_PROCESSOR = "SUBMITTED_BY_JOB_PROCESSOR"; + @Autowired protected DataMgmtSvc dataMgmtSvc; @@ -55,6 +64,10 @@ public class HiveHqlGenerator { @Autowired protected NotificationSender notificationSender; + @Autowired + JobProcessorDAO jobProcessorDAO; + + public List schemaSql(boolean schemaExists, JobDefinition jd) throws ApiException, SQLException { String tableName = jd.getTableName(); @@ -78,7 +91,7 @@ public List schemaSql(boolean schemaExists, JobDefinition jd) throws Api FormatChange change = detectSchemaChange(jd, hiveTableSchema, format, ddl); if (change.hasColumnChanges()) { - boolean cascade = cascade(jd.getWfType()); + boolean cascade = cascade(jd); String cascadeStr = ""; if(cascade) { @@ -228,7 +241,7 @@ public String buildHql(JobDefinition jd, List partitions) throws IOExcep addPartitionChanges(tableExists, jd, dataDdl, schemaHql); //Stats - addAnalyzeStats( jd, partitions, schemaHql ); + addAnalyzeStats( jd, partitions); // Create file Path hqlFilePath = createHqlFile( jd ); @@ -260,25 +273,60 @@ protected void addPartitionChanges( boolean tableExists, JobDefinition jd, Busin } } - protected void addAnalyzeStats( JobDefinition jd, List partitions, List schemaHql ) { - if (MetastoreUtil.isSingletonWF( jd.getWfType() )) { - if (jd.getPartitionKey().equalsIgnoreCase("partition")) { - schemaHql.add(String.format("analyze table %s compute statistics noscan;", jd.getTableName())); - } else { - schemaHql.add(String.format("analyze table %s partition(`%s`) compute statistics noscan;", jd.getTableName(), jd.getPartitionKey())); - } - } else if (partitions.size() == 1) { - if ( jd.isSubPartitionLevelProcessing() ) { - schemaHql.add( String.format( "analyze table %s partition %s compute statistics noscan;", jd.getTableName(), jd.getPartitionsSpecForStats() ) ); + protected void addAnalyzeStats( JobDefinition jd, List partitions ) { + + log.info( "Adding gather Stats job" ); + try { + + if ( partitions.size() == 1 ) { + submitStatsJob( jd, jd.partitionValuesForStats(partitions.get( 0 )) ); } else { - schemaHql.add( String.format( "analyze table %s partition(`%s`='%s') compute statistics noscan;" - , jd.getTableName(), jd.getPartitionKey(), partitions.get( 0 ) ) - ); + partitions.stream() + .forEach( s -> submitStatsJob( jd, s ) ); } + + // Start Stats cluster is not running + dataMgmtSvc.createCluster( true , JobProcessorConstants.METASTOR_STATS_CLUSTER_NAME ); + } catch ( Exception e ) { + log.error( "Problem encountered in addAnalyzeStats: {}", e.getMessage(), e ); } } - protected boolean cascade( int wfType ){ + private void submitStatsJob(JobDefinition jd, String partitionValue) { + DMNotification dmNotification = buildDMNotification(jd); + + dmNotification.setWorkflowType(ObjectProcessor.WF_TYPE_MANAGED_STATS); + dmNotification.setExecutionId(SUBMITTED_BY_JOB_PROCESSOR); + + dmNotification.setPartitionKey(jd.partitionKeysForStats()); + dmNotification.setPartitionValue(partitionValue); + + log.info("Herd Notification DB request: \n{}", dmNotification); + jobProcessorDAO.addDMNotification(dmNotification); + } + + + protected String partition(Set partitionKeys ) { + return partitionKeys.stream().collect( Collectors.joining( "`,`", "`", "`" ) ); + } + + protected String quotedPartitionKeys( Schema schema ) { + return schema.getPartitions().stream().map( p -> p.getName() ).collect( Collectors.joining( "`,`", "`", "`" ) ); + } + + + protected DMNotification buildDMNotification( JobDefinition jd ) { + return DMNotification.builder() + .namespace( jd.getObjectDefinition().getNameSpace() ) + .objDefName( jd.getObjectDefinition().getObjectName() ) + .formatUsage( jd.getObjectDefinition().getUsageCode() ) + .fileType( jd.getObjectDefinition().getFileType() ) + .clusterName( jd.getClusterName() ) + .correlationData( jd.getCorrelation() ) + .build(); + } + + protected boolean cascade( JobDefinition jd ){ return true; } diff --git a/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/jobProcessor/ManagedObjStatsProcessor.java b/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/jobProcessor/ManagedObjStatsProcessor.java index 3e1a8da1..d6c16a61 100644 --- a/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/jobProcessor/ManagedObjStatsProcessor.java +++ b/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/jobProcessor/ManagedObjStatsProcessor.java @@ -16,45 +16,36 @@ package org.finra.herd.metastore.managed.jobProcessor; import com.google.common.base.Strings; +import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import org.finra.herd.metastore.managed.JobDefinition; -import org.finra.herd.metastore.managed.conf.HerdMetastoreConfig; import org.finra.herd.metastore.managed.datamgmt.DataMgmtSvc; import org.finra.herd.metastore.managed.util.JobProcessorConstants; -import org.finra.herd.sdk.invoker.ApiException; -import org.finra.herd.sdk.model.BusinessObjectFormat; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import java.util.List; + @Component @Slf4j public class ManagedObjStatsProcessor extends JobProcessor { - @Autowired - DataMgmtSvc dataMgmtSvc; @Override protected ProcessBuilder createProcessBuilder( JobDefinition od ) { - String tblName = od.getTableName(); - - ProcessBuilder pb = null; - try { - BusinessObjectFormat dmFormat = dataMgmtSvc.getDMFormat( od ); - String quotedPartitionKeys = quotedPartitionKeys( dmFormat.getSchema() ); - if ( Strings.isNullOrEmpty( quotedPartitionKeys ) ) { - log.error( "ERROR: PARTITION_COLUMNS is empty for {}", tblName ); - return pb; - } - pb = new ProcessBuilder( "sh" - , JobProcessorConstants.GATHER_STATS_SCRIPT_PATH - , od.getObjectDefinition().getDbName() - , tblName - , quotedPartitionKeys - ); - } catch ( ApiException e ) { - log.error( "Could not get BO format due to: {}", e.getMessage(), e ); + String partitionSpecsForStats = od.partitionSpecForStats(); + + if ( Strings.isNullOrEmpty( partitionSpecsForStats ) ) { + log.error( "ERROR: STATS PARTITION Spec is empty: {}", od ); + return null; } - return pb; + List command = Lists.newArrayList( "sh", JobProcessorConstants.GATHER_STATS_SCRIPT_PATH ); + command.add( od.getObjectDefinition().getDbName() ); + command.add( od.getTableName() ); + command.add( partitionSpecsForStats ); + + log.info( "Calling analyze stats with: {}", command ); + return new ProcessBuilder( command ); } } diff --git a/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/util/JobProcessorConstants.java b/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/util/JobProcessorConstants.java index 2d9a44b5..646ac15e 100644 --- a/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/util/JobProcessorConstants.java +++ b/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/util/JobProcessorConstants.java @@ -18,10 +18,13 @@ import org.finra.herd.metastore.managed.conf.HerdMetastoreConfig; public interface JobProcessorConstants { - String METASTOR_CLUSTER_NAME = "metastor"; + String METASTOR_CLUSTER_NAME = "metastore"; + String METASTOR_STATS_CLUSTER_NAME = "metastore_stats"; String SVC_ACC_PREFIX = "svc"; String UNDERSCORE = "_"; + String EQUALS = "="; + String FORWARD_SLASH = "/"; String COLON = ":"; String NEW_LINE = "\n"; @@ -29,6 +32,7 @@ public interface JobProcessorConstants { String DOUBLE_UNDERSCORE = "__"; String COMMA = ","; String SUB_PARTITION_VAL_SEPARATOR = ":"; + String NON_PARTITIONED_SINGLETON_VALUE = "none"; int DM_RECORD_RETURN_MAX_LIMIT = 1000; diff --git a/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/util/MetastoreUtil.java b/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/util/MetastoreUtil.java index 69310efd..14d96c1f 100644 --- a/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/util/MetastoreUtil.java +++ b/metastor/managedObjectLoader/src/main/java/org/finra/herd/metastore/managed/util/MetastoreUtil.java @@ -1,8 +1,12 @@ package org.finra.herd.metastore.managed.util; +import org.finra.herd.metastore.managed.JobDefinition; import org.finra.herd.metastore.managed.ObjectProcessor; public class MetastoreUtil { + + public static final String NON_SINGLETON_PARTITION_KEY = "partition"; + public static boolean isSingletonWF( int wfType ) { return (ObjectProcessor.WF_TYPE_SINGLETON == wfType); } @@ -14,4 +18,18 @@ public static boolean isManagedWF( int wfType ) { public static boolean isManagedStatsWF( int wfType ) { return (ObjectProcessor.WF_TYPE_MANAGED_STATS == wfType); } + + public static boolean isNonPartitionedSingleton( int wfType, String partitionKey ) { + return isSingletonWF( wfType ) && NON_SINGLETON_PARTITION_KEY.equalsIgnoreCase( partitionKey ); + } + + + public static boolean isNonPartitionedSingleton( String partitionKey ) { + return NON_SINGLETON_PARTITION_KEY.equalsIgnoreCase( partitionKey ); + } + + public static boolean isPartitionedSingleton( int wfType, String partitionKey ) { + return isSingletonWF( wfType ) && !(NON_SINGLETON_PARTITION_KEY.equalsIgnoreCase( partitionKey )); + } + } diff --git a/metastor/metastoreOperations/pom.xml b/metastor/metastoreOperations/pom.xml index 270d9109..1977568d 100644 --- a/metastor/metastoreOperations/pom.xml +++ b/metastor/metastoreOperations/pom.xml @@ -20,7 +20,7 @@ metastore org.finra.herd-mdl.metastore - 1.2.25 + 1.2.38 4.0.0 diff --git a/metastor/pom.xml b/metastor/pom.xml index 75927c7f..fe968111 100644 --- a/metastor/pom.xml +++ b/metastor/pom.xml @@ -29,7 +29,7 @@ metastore org.finra.herd-mdl.metastore - 1.2.25 + 1.2.38 pom