Skip to content

Commit

Permalink
Merge pull request #92 from FINRAOS/subPartitionProcessing
Browse files Browse the repository at this point in the history
Processing at subpartition level
  • Loading branch information
VekasS committed Oct 22, 2019
2 parents 6b545e7 + e9c59f3 commit 0d3006f
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 45 deletions.
2 changes: 1 addition & 1 deletion metastor/managedObjectLoader/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<parent>
<artifactId>metastore</artifactId>
<groupId>org.finra.herd-mdl.metastore</groupId>
<version>1.2.19</version>
<version>1.2.25</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
**/
package org.finra.herd.metastore.managed;

import com.google.common.collect.Lists;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
Expand All @@ -28,6 +29,8 @@
import java.io.StringReader;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.StringJoiner;

Expand All @@ -44,16 +47,15 @@ public class JobDefinition {

boolean subPartitionLevelProcessing = false;

String partitionValue;
String correlation;
String executionID;
String clusterName;
String actualObjectName;
String tableName;
String partitionKey;
String topLevelPartitionValue;
String subPartitionKey;
String subPartitionValue;
String partitionValue;
List<String> partitionValues;
Map<String, String> partitionsKeyValue;

ObjectDefinition objectDefinition;

Expand Down Expand Up @@ -97,6 +99,15 @@ public JobDefinition(long id, String nameSpace, String objectName, String usageC
public JobDefinition() {
}

public String getPartitionsSpecForStats(){
StringJoiner partitionStats = new StringJoiner( "," , "(", ")");
partitionsKeyValue.forEach( (k, v) -> {
partitionStats.add( String.format( "`%s`='%s'", k, v ) );
} );

return partitionStats.toString();
}

@Slf4j
public static class ObjectDefinitionMapper implements RowMapper<JobDefinition> {

Expand All @@ -111,12 +122,12 @@ public JobDefinition mapRow(ResultSet resultSet, int i) throws SQLException {
jobDefinition.objectDefinition.usageCode=resultSet.getString("USAGE_CODE");
jobDefinition.objectDefinition.fileType=resultSet.getString("FILE_TYPE");

jobDefinition.partitionValue=resultSet.getString("PARTITION_VALUES");
jobDefinition.partitionValue=resultSet.getString("PARTITION_VALUES");
if ( jobDefinition.getPartitionValue().contains( SUB_PARTITION_VAL_SEPARATOR ) ) {
jobDefinition.topLevelPartitionValue = jobDefinition.getPartitionValue().split( SUB_PARTITION_VAL_SEPARATOR )[0];
jobDefinition.subPartitionValue = jobDefinition.getPartitionValue().split( SUB_PARTITION_VAL_SEPARATOR )[1];
jobDefinition.subPartitionLevelProcessing = true;
jobDefinition.partitionValues = Lists.newArrayList( jobDefinition.getPartitionValue().split( SUB_PARTITION_VAL_SEPARATOR ));
}

// Execution ID not being used, other than filenaming .hql files it just have to unique
jobDefinition.executionID=resultSet.getString("ID");
jobDefinition.partitionKey=resultSet.getString("PARTITION_KEY");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package org.finra.herd.metastore.managed.datamgmt;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.finra.herd.metastore.managed.JobDefinition;
import org.finra.herd.metastore.managed.ObjectProcessor;
Expand All @@ -32,6 +33,8 @@
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;

/**
* Herd Client
Expand Down Expand Up @@ -100,38 +103,22 @@ public BusinessObjectDataDdl getBusinessObjectDataDdl( org.finra.herd.metastore.
request.setTableName( jd.getTableName() );

List<PartitionValueFilter> partitionValueFilters = Lists.newArrayList();
PartitionValueFilter filter = new PartitionValueFilter();

filter.setPartitionKey( jd.getPartitionKey() );

if ( jd.getWfType() == ObjectProcessor.WF_TYPE_SINGLETON && !jd.getPartitionKey().equalsIgnoreCase( "PARTITION" ) ) {
Calendar c = Calendar.getInstance();
c.add( Calendar.DATE, 1 );
String date = new SimpleDateFormat( "YYYY-MM-dd" ).format( c.getTime() );
LatestBeforePartitionValue value = new LatestBeforePartitionValue();
value.setPartitionValue( date );
filter.setLatestBeforePartitionValue( value );

filter.setPartitionValues( null );
filter.setLatestAfterPartitionValue( null );


addPartitionedSingletonFilter( jd, partitionValueFilters );
} else {

log.info( "Partitions: {}", partitions );
if ( jd.getWfType() == ObjectProcessor.WF_TYPE_SINGLETON && jd.getPartitionKey().equalsIgnoreCase( "PARTITION" ) ) {
filter.setPartitionValues( Lists.newArrayList( "none" ) );
addPartitionFilter( jd.getPartitionKey(), Lists.newArrayList( "none" ), partitionValueFilters );
} else {
if ( jd.isSubPartitionLevelProcessing() ) {
log.info( "Top Level Partition: {}, SubPartition: {}", jd.getTopLevelPartitionValue(), jd.getSubPartitionValue() );
filter.setPartitionValues( Lists.newArrayList( jd.getTopLevelPartitionValue() ) );
addSubPartitionFilter( jd, partitionValueFilters);
} else {
filter.setPartitionValues( partitions );
addPartitionFilter( jd.getPartitionKey(), partitions, partitionValueFilters );
}
}
}
partitionValueFilters.add( filter );

request.setPartitionValueFilter( null );
request.setPartitionValueFilters( partitionValueFilters );
Expand All @@ -141,28 +128,51 @@ public BusinessObjectDataDdl getBusinessObjectDataDdl( org.finra.herd.metastore.
return businessObjectDataApi.businessObjectDataGenerateBusinessObjectDataDdl( request );
}

private void addPartitionFilter( String partitionKey, List<String> partitions, List<PartitionValueFilter> partitionValueFilters ) {
PartitionValueFilter filter = new PartitionValueFilter();
filter.setPartitionKey( partitionKey );
filter.setPartitionValues( partitions );
partitionValueFilters.add( filter );
}

private void addPartitionedSingletonFilter( JobDefinition jd, List<PartitionValueFilter> partitionValueFilters ) {
Calendar c = Calendar.getInstance();
c.add( Calendar.DATE, 1 );
String date = new SimpleDateFormat( "YYYY-MM-dd" ).format( c.getTime() );
LatestBeforePartitionValue value = new LatestBeforePartitionValue();
value.setPartitionValue( date );

PartitionValueFilter filter = new PartitionValueFilter();
filter.setPartitionKey( jd.getPartitionKey() );
filter.setLatestBeforePartitionValue( value );
filter.setPartitionValues( null );
filter.setLatestAfterPartitionValue( null );
partitionValueFilters.add( filter );
}

/**
* To add Sub Partition value filter
* To partition filter with sub partitions
*
* @param jd
* @param partitionValueFilters
* @throws ApiException
*/
private void addSubPartitionFilter( JobDefinition jd, List<PartitionValueFilter> partitionValueFilters ) throws ApiException {
List<SchemaColumn> partitionKeys = getDMFormat( jd ).getSchema().getPartitions();
log.debug( "Partition Keys {} for {}", partitionKeys, jd.getTableName() );

if ( partitionKeys.size() >= 2 ) {
PartitionValueFilter subPartitionFilter = new PartitionValueFilter();
String subPartitionKey = partitionKeys.get( 1 ).getName();
log.debug( "SubPartition Partition Key: {}", subPartitionKey );
jd.setSubPartitionKey( subPartitionKey );
subPartitionFilter.setPartitionKey( subPartitionKey );
subPartitionFilter.setPartitionValues( Lists.newArrayList( jd.getSubPartitionValue() ) );
partitionValueFilters.add( subPartitionFilter );
} else {
log.warn( "Object not partitioned correctly, not enough partition keys to find sub partition" );
}
log.info( "Partition Keys {} for {}", partitionKeys, jd.getTableName() );
Map<String, String> partitionKeyValues = Maps.newLinkedHashMap();

IntStream.range( 0, jd.getPartitionValues().size() )
.forEach( i -> {
String partitionKey = partitionKeys.get( i ).getName();
String partitionValue = jd.getPartitionValues().get( i );
log.info( "Partition Key: {}\t Value: {}", partitionKey, partitionValue );
partitionKeyValues.put( partitionKey, partitionValue );

addPartitionFilter( partitionKey, Lists.newArrayList( partitionValue ), partitionValueFilters );
} );

jd.setPartitionsKeyValue( partitionKeyValues );
}

public BusinessObjectFormatKeys getBOAllFormatVersions( org.finra.herd.metastore.managed.JobDefinition od, boolean latestBusinessObjectFormatVersion ) throws ApiException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,7 @@ protected void addAnalyzeStats( JobDefinition jd, List<String> partitions, List<
}
} else if (partitions.size() == 1) {
if ( jd.isSubPartitionLevelProcessing() ) {
schemaHql.add( String.format( "analyze table %s partition(`%s`='%s', `%s`='%s') compute statistics noscan;"
, jd.getTableName(), jd.getPartitionKey(), jd.getTopLevelPartitionValue(), jd.getSubPartitionKey(), jd.getSubPartitionValue() ) );
schemaHql.add( String.format( "analyze table %s partition %s compute statistics noscan;", jd.getTableName(), jd.getPartitionsSpecForStats() ) );
} else {
schemaHql.add( String.format( "analyze table %s partition(`%s`='%s') compute statistics noscan;"
, jd.getTableName(), jd.getPartitionKey(), partitions.get( 0 ) )
Expand Down
2 changes: 1 addition & 1 deletion metastor/metastoreOperations/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>metastore</artifactId>
<groupId>org.finra.herd-mdl.metastore</groupId>
<version>1.2.19</version>
<version>1.2.25</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion metastor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

<artifactId>metastore</artifactId>
<groupId>org.finra.herd-mdl.metastore</groupId>
<version>1.2.19</version>
<version>1.2.25</version>
<packaging>pom</packaging>

<properties>
Expand Down

0 comments on commit 0d3006f

Please sign in to comment.