Skip to content

Commit

Permalink
NIFI-4644 Fixed LookupService API to allow for more than String/Strin…
Browse files Browse the repository at this point in the history
…g lookup pairs.
  • Loading branch information
MikeThomsen committed Dec 13, 2017
1 parent bfe92b9 commit caa1bc1
Show file tree
Hide file tree
Showing 16 changed files with 36 additions and 58 deletions.
Expand Up @@ -66,7 +66,7 @@ public class ScriptedLookupService extends AbstractScriptedControllerService imp
private volatile File kerberosServiceKeytab = null;

@Override
public Optional<Object> lookup(Map<String, String> coordinates) throws LookupFailureException {
public Optional<Object> lookup(Map<String, Object> coordinates) throws LookupFailureException {
// Delegate the lookup() call to the scripted LookupService
return lookupService.get().lookup(coordinates);
}
Expand Down
Expand Up @@ -259,7 +259,7 @@ protected Set<Relationship> route(final Record record, final RecordSchema writeS
final Tuple<Map<String, RecordPath>, RecordPath> flowFileContext) {

final Map<String, RecordPath> recordPaths = flowFileContext.getKey();
final Map<String, String> lookupCoordinates = new HashMap<>(recordPaths.size());
final Map<String, Object> lookupCoordinates = new HashMap<>(recordPaths.size());

for (final Map.Entry<String, RecordPath> entry : recordPaths.entrySet()) {
final String coordinateKey = entry.getKey();
Expand All @@ -284,7 +284,8 @@ protected Set<Relationship> route(final Record record, final RecordSchema writeS
}

final FieldValue fieldValue = lookupFieldValues.get(0);
final String coordinateValue = DataTypeUtils.toString(fieldValue.getValue(), (String) null);
final Object coordinateValue = (fieldValue.getValue() instanceof Number || fieldValue.getValue() instanceof Boolean)
? fieldValue.getValue() : DataTypeUtils.toString(fieldValue.getValue(), (String) null);
lookupCoordinates.put(coordinateKey, coordinateValue);
}

Expand Down
Expand Up @@ -142,7 +142,7 @@ public void testCustomValidateMissingDynamicProps() throws InitializationExcepti

private static class InvalidLookupService extends AbstractControllerService implements StringLookupService {
@Override
public Optional<String> lookup(Map<String, String> coordinates) {
public Optional<String> lookup(Map<String, Object> coordinates) {
return Optional.empty();
}

Expand Down
Expand Up @@ -383,12 +383,12 @@ public Class<?> getValueType() {
}

@Override
public Optional<String> lookup(final Map<String, String> coordinates) {
if (coordinates == null) {
public Optional<String> lookup(final Map<String, Object> coordinates) {
if (coordinates == null || coordinates.get("lookup") == null) {
return Optional.empty();
}

final String key = coordinates.get("lookup");
final String key = (String)coordinates.get("lookup");
if (key == null) {
return Optional.empty();
}
Expand All @@ -415,12 +415,12 @@ public Class<?> getValueType() {
}

@Override
public Optional<Record> lookup(final Map<String, String> coordinates) {
if (coordinates == null) {
public Optional<Record> lookup(final Map<String, Object> coordinates) {
if (coordinates == null || coordinates.get("lookup") == null) {
return Optional.empty();
}

final String key = coordinates.get("lookup");
final String key = (String)coordinates.get("lookup");
if (key == null) {
return Optional.empty();
}
Expand Down
Expand Up @@ -114,8 +114,12 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
}

@Override
public Optional<Record> lookup(Map<String, String> coordinates) throws LookupFailureException {
final String rowKey = coordinates.get(ROW_KEY_KEY);
public Optional<Record> lookup(Map<String, Object> coordinates) throws LookupFailureException {
if (coordinates.get(ROW_KEY_KEY) == null) {
return Optional.empty();
}

final String rowKey = coordinates.get(ROW_KEY_KEY).toString();
if (StringUtils.isBlank(rowKey)) {
return Optional.empty();
}
Expand Down
Expand Up @@ -88,7 +88,7 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro

final String rowKey = context.getProperty(HBASE_ROW).getValue();

final Map<String,String> coordinates = new HashMap<>();
final Map<String,Object> coordinates = new HashMap<>();
coordinates.put(HBase_1_1_2_RecordLookupService.ROW_KEY_KEY, rowKey);

final LookupService<Record> lookupService = context.getProperty(HBASE_LOOKUP_SERVICE).asControllerService(LookupService.class);
Expand Down
Expand Up @@ -33,7 +33,7 @@ public interface LookupService<T> extends ControllerService {
*
* @throws LookupFailureException if unable to lookup a value for the given coordinates
*/
Optional<T> lookup(Map<String, String> coordinates) throws LookupFailureException;
Optional<T> lookup(Map<String, Object> coordinates) throws LookupFailureException;

/**
* @return the Class that represents the type of value that will be returned by {@link #lookup(Map)}
Expand Down
Expand Up @@ -17,24 +17,10 @@

package org.apache.nifi.lookup;

import java.util.Map;
import java.util.Optional;

import org.apache.nifi.serialization.record.Record;

public interface RecordLookupService extends LookupService<Record> {

/**
* Returns an Optional Record that corresponds to the given coordinates
*
* @param coordinates the coordinates to lookup
* @return an Optional Record that corresponds to the given coordinates
*
* @throws LookupFailureException if unable to lookup a value for the given coordinates
*/
@Override
Optional<Record> lookup(Map<String, String> coordinates) throws LookupFailureException;

@Override
default Class<?> getValueType() {
return Record.class;
Expand Down
Expand Up @@ -17,22 +17,8 @@

package org.apache.nifi.lookup;

import java.util.Map;
import java.util.Optional;

public interface StringLookupService extends LookupService<String> {

/**
* Returns an Optional value that corresponds to the given coordinates
*
* @param coordinates the coordinates to lookup
* @return an Optional String that represents the value for the given coordinates
*
* @throws LookupFailureException if unable to lookup a value for the given key
*/
@Override
Optional<String> lookup(Map<String, String> coordinates) throws LookupFailureException;

@Override
default Class<?> getValueType() {
return String.class;
Expand Down
Expand Up @@ -206,12 +206,12 @@ public void onEnabled(final ConfigurationContext context) throws InitializationE
}

@Override
public Optional<Record> lookup(final Map<String, String> coordinates) throws LookupFailureException {
public Optional<Record> lookup(final Map<String, Object> coordinates) throws LookupFailureException {
if (coordinates == null) {
return Optional.empty();
}

final String key = coordinates.get(KEY);
final String key = (String)coordinates.get(KEY);
if (StringUtils.isBlank(key)) {
return Optional.empty();
}
Expand Down
Expand Up @@ -194,12 +194,12 @@ public void onEnabled(final ConfigurationContext context) throws InitializationE
}

@Override
public Optional<String> lookup(final Map<String, String> coordinates) throws LookupFailureException {
public Optional<String> lookup(final Map<String, Object> coordinates) throws LookupFailureException {
if (coordinates == null) {
return Optional.empty();
}

final String key = coordinates.get(KEY);
final String key = coordinates.get(KEY).toString();
if (StringUtils.isBlank(key)) {
return Optional.empty();
}
Expand Down
Expand Up @@ -58,12 +58,12 @@ public void cacheConfiguredValues(final ConfigurationContext context) {
}

@Override
public Optional<String> lookup(final Map<String, String> coordinates) {
public Optional<String> lookup(final Map<String, Object> coordinates) {
if (coordinates == null) {
return Optional.empty();
}

final String key = coordinates.get(KEY);
final String key = coordinates.get(KEY).toString();
if (key == null) {
return Optional.empty();
}
Expand Down
Expand Up @@ -122,12 +122,12 @@ public void onEvent(ConfigurationBuilderEvent event) {
}

@Override
public Optional<String> lookup(final Map<String, String> coordinates) throws LookupFailureException {
public Optional<String> lookup(final Map<String, Object> coordinates) throws LookupFailureException {
if (coordinates == null) {
return Optional.empty();
}

final String key = coordinates.get(KEY);
final String key = coordinates.get(KEY).toString();
if (StringUtils.isBlank(key)) {
return Optional.empty();
}
Expand Down
Expand Up @@ -189,7 +189,7 @@ public Set<String> getRequiredKeys() {
}

@Override
public Optional<Record> lookup(final Map<String, String> coordinates) throws LookupFailureException {
public Optional<Record> lookup(final Map<String, Object> coordinates) throws LookupFailureException {
if (coordinates == null) {
return Optional.empty();
}
Expand Down Expand Up @@ -237,12 +237,13 @@ public Optional<Record> lookup(final Map<String, String> coordinates) throws Loo
}
}

private Optional<Record> doLookup(final DatabaseReader databaseReader, final Map<String, String> coordinates) throws LookupFailureException, InvalidDatabaseException {
final String ipAddress = coordinates.get(IP_KEY);
if (ipAddress == null) {
private Optional<Record> doLookup(final DatabaseReader databaseReader, final Map<String, Object> coordinates) throws LookupFailureException, InvalidDatabaseException {
if (coordinates.get(IP_KEY) == null) {
return Optional.empty();
}

final String ipAddress = coordinates.get(IP_KEY).toString();

final InetAddress inetAddress;
try {
inetAddress = InetAddress.getByName(ipAddress);
Expand Down
Expand Up @@ -74,7 +74,7 @@ public class MongoDBLookupService extends MongoDBControllerService implements Lo
}

@Override
public Optional<Object> lookup(Map<String, String> coordinates) throws LookupFailureException {
public Optional<Object> lookup(Map<String, Object> coordinates) throws LookupFailureException {
Map<String, Object> clean = new HashMap<>();
clean.putAll(coordinates);
Document query = new Document(clean);
Expand Down
Expand Up @@ -71,7 +71,7 @@ public void testLookupSingle() throws Exception {
Document document = service.convertJson("{ \"uuid\": \"x-y-z\", \"message\": \"Hello, world\" }");
service.insert(document);

Map<String, String> criteria = new HashMap<>();
Map<String, Object> criteria = new HashMap<>();
criteria.put("uuid", "x-y-z");
Optional result = service.lookup(criteria);

Expand Down Expand Up @@ -99,7 +99,7 @@ public void testLookupRecord() throws Exception {
Document document = service.convertJson("{ \"uuid\": \"x-y-z\", \"message\": \"Hello, world\" }");
service.insert(document);

Map<String, String> criteria = new HashMap<>();
Map<String, Object> criteria = new HashMap<>();
criteria.put("uuid", "x-y-z");
Optional result = service.lookup(criteria);

Expand Down Expand Up @@ -129,7 +129,7 @@ public void testServiceParameters() throws Exception {
Document document = service.convertJson("{ \"uuid\": \"x-y-z\", \"message\": \"Hello, world\" }");
service.insert(document);

Map<String, String> criteria = new HashMap<>();
Map<String, Object> criteria = new HashMap<>();
criteria.put("uuid", "x-y-z");

boolean error = false;
Expand Down

0 comments on commit caa1bc1

Please sign in to comment.