Skip to content

Commit

Permalink
NIFI-3335: Add initial.maxvalue support to GenerateTableFetch
Browse files Browse the repository at this point in the history
  • Loading branch information
mattyb149 committed Jul 25, 2017
1 parent 9a5d4ff commit 8eea477
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 32 deletions.
Expand Up @@ -21,6 +21,7 @@
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
Expand Down Expand Up @@ -82,6 +83,8 @@
*/
public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFactoryProcessor {

public static final String INITIAL_MAX_VALUE_PROP_START = "initial.maxvalue.";

// Relationships
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
Expand Down Expand Up @@ -185,6 +188,18 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
.build();
}

@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.required(false)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
.expressionLanguageSupported(true)
.dynamic(true)
.build();
}

// A common validation procedure for DB fetch processors, it stores whether the Table Name and/or Max Value Column properties have expression language
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
// For backwards-compatibility, keep track of whether the table name and max-value column properties are dynamic (i.e. has expression language)
Expand Down Expand Up @@ -424,4 +439,20 @@ protected static String getStateKey(String prefix, String columnName) {
}
return sb.toString();
}

protected Map<String,String> getDefaultMaxValueProperties(final Map<PropertyDescriptor, String> properties){
final Map<String,String> defaultMaxValues = new HashMap<>();

for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) {
final String key = entry.getKey().getName();

if(!key.startsWith(INITIAL_MAX_VALUE_PROP_START)) {
continue;
}

defaultMaxValues.put(key.substring(INITIAL_MAX_VALUE_PROP_START.length()), entry.getValue());
}

return defaultMaxValues;
}
}
Expand Up @@ -17,6 +17,7 @@
package org.apache.nifi.processors.standard;

import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.Stateful;
Expand Down Expand Up @@ -94,6 +95,8 @@
@WritesAttribute(attribute = "generatetablefetch.limit", description = "The number of result rows to be fetched by the SQL statement."),
@WritesAttribute(attribute = "generatetablefetch.offset", description = "Offset to be used to retrieve the corresponding partition.")
})
@DynamicProperty(name = "Initial Max Value", value = "Attribute Expression Language", supportsExpressionLanguage = false, description = "Specifies an initial "
+ "max value for max value columns. Properties should be added in the format `initial.maxvalue.{max_value_column}`.")
public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {

public static final PropertyDescriptor PARTITION_SIZE = new PropertyDescriptor.Builder()
Expand Down Expand Up @@ -198,6 +201,25 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory
// set as the current state map (after the session has been committed)
final Map<String, String> statePropertyMap = new HashMap<>(stateMap.toMap());

// If an initial max value for column(s) has been specified using properties, and this column is not in the state manager, sync them to the state property map
final Map<String,String> maxValueProperties = getDefaultMaxValueProperties(context.getProperties());
for (final Map.Entry<String, String> maxProp : maxValueProperties.entrySet()) {
String maxPropKey = maxProp.getKey().toLowerCase();
String fullyQualifiedMaxPropKey = getStateKey(tableName, maxPropKey);
if (!statePropertyMap.containsKey(fullyQualifiedMaxPropKey)) {
String newMaxPropValue;
// If we can't find the value at the fully-qualified key name, it is possible (under a previous scheme)
// the value has been stored under a key that is only the column name. Fall back to check the column name,
// but store the new initial max value under the fully-qualified key.
if (statePropertyMap.containsKey(maxPropKey)) {
newMaxPropValue = statePropertyMap.get(maxPropKey);
} else {
newMaxPropValue = maxProp.getValue();
}
statePropertyMap.put(fullyQualifiedMaxPropKey, newMaxPropValue);
}
}

// Build a WHERE clause with maximum-value columns (if they exist), and a list of column names that will contain MAX(<column>) aliases. The
// executed SQL query will retrieve the count of all records after the filter(s) have been applied, as well as the new maximum values for the
// specified columns. This allows the processor to generate the correctly partitioned SQL statements as well as to update the state with the
Expand Down
Expand Up @@ -33,7 +33,6 @@
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
Expand Down Expand Up @@ -110,8 +109,6 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {

public static final String RESULT_TABLENAME = "tablename";
public static final String RESULT_ROW_COUNT = "querydbtable.row.count";
public static final String INTIIAL_MAX_VALUE_PROP_START = "initial.maxvalue.";


public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder()
.name("Fetch Size")
Expand Down Expand Up @@ -177,18 +174,6 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propDescriptors;
}

@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.required(false)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
.expressionLanguageSupported(true)
.dynamic(true)
.build();
}

@OnScheduled
public void setup(final ProcessContext context) {
super.setup(context);
Expand Down Expand Up @@ -417,23 +402,6 @@ protected String getQuery(DatabaseAdapter dbAdapter, String tableName, String co
return query.toString();
}


protected Map<String,String> getDefaultMaxValueProperties(final Map<PropertyDescriptor, String> properties){
final Map<String,String> defaultMaxValues = new HashMap<>();

for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) {
final String key = entry.getKey().getName();

if(!key.startsWith(INTIIAL_MAX_VALUE_PROP_START)) {
continue;
}

defaultMaxValues.put(key.substring(INTIIAL_MAX_VALUE_PROP_START.length()), entry.getValue());
}

return defaultMaxValues;
}

protected class MaxValueResultSetRowCollector implements JdbcCommon.ResultSetRowCallback {
DatabaseAdapter dbAdapter;
Map<String, String> newColMap;
Expand Down
Expand Up @@ -584,6 +584,75 @@ public void testRidiculousRowCount() throws ClassNotFoundException, SQLException
runner.clearTransferState();
}

@Test
public void testInitialMaxValue() throws ClassNotFoundException, SQLException, InitializationException, IOException {

// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();

try {
stmt.execute("drop table TEST_QUERY_DB_TABLE");
} catch (final SQLException sqle) {
// Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
}

stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')");

runner.setProperty(GenerateTableFetch.TABLE_NAME, "TEST_QUERY_DB_TABLE");
runner.setIncomingConnection(false);
runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "ID");
runner.setProperty("initial.maxvalue.ID", "1");

runner.run();
runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
String query = new String(flowFile.toByteArray());
assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 1 ORDER BY ID FETCH NEXT 10000 ROWS ONLY", query);
ResultSet resultSet = stmt.executeQuery(query);
// Should be one record (the initial max value skips the first two)
assertTrue(resultSet.next());
assertFalse(resultSet.next());
runner.clearTransferState();

// Run again, this time no flowfiles/rows should be transferred
runner.run();
runner.assertAllFlowFilesTransferred(REL_SUCCESS, 0);
runner.clearTransferState();

// Add 3 new rows with a higher ID and run with a partition size of 2. Two flow files should be transferred
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (3, 'Mary West', 15.0, '2000-01-01 03:23:34.234')");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (4, 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (5, 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')");
runner.setProperty(GenerateTableFetch.PARTITION_SIZE, "2");
runner.setProperty("initial.maxvalue.ID", "5"); // This should have no effect as there is a max value in the processor state
runner.run();
runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);

// Verify first flow file's contents
flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
query = new String(flowFile.toByteArray());
assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 2 ORDER BY ID FETCH NEXT 2 ROWS ONLY", query);
resultSet = stmt.executeQuery(query);
// Should be two records
assertTrue(resultSet.next());
assertTrue(resultSet.next());
assertFalse(resultSet.next());

// Verify second flow file's contents
flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(1);
query = new String(flowFile.toByteArray());
assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 2 ORDER BY ID OFFSET 2 ROWS FETCH NEXT 2 ROWS ONLY", query);
resultSet = stmt.executeQuery(query);
// Should be one record
assertTrue(resultSet.next());
assertFalse(resultSet.next());
runner.clearTransferState();
}


/**
* Simple implementation only for GenerateTableFetch processor testing.
Expand Down

0 comments on commit 8eea477

Please sign in to comment.