Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-4644 Fixed LookupService API to allow for more than String/Strin… #2304

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -66,7 +66,7 @@ public class ScriptedLookupService extends AbstractScriptedControllerService imp
private volatile File kerberosServiceKeytab = null;

@Override
public Optional<Object> lookup(Map<String, String> coordinates) throws LookupFailureException {
public Optional<Object> lookup(Map<String, Object> coordinates) throws LookupFailureException {
// Delegate the lookup() call to the scripted LookupService
return lookupService.get().lookup(coordinates);
}
Expand Down
Expand Up @@ -259,7 +259,7 @@ protected Set<Relationship> route(final Record record, final RecordSchema writeS
final Tuple<Map<String, RecordPath>, RecordPath> flowFileContext) {

final Map<String, RecordPath> recordPaths = flowFileContext.getKey();
final Map<String, String> lookupCoordinates = new HashMap<>(recordPaths.size());
final Map<String, Object> lookupCoordinates = new HashMap<>(recordPaths.size());

for (final Map.Entry<String, RecordPath> entry : recordPaths.entrySet()) {
final String coordinateKey = entry.getKey();
Expand All @@ -284,7 +284,8 @@ protected Set<Relationship> route(final Record record, final RecordSchema writeS
}

final FieldValue fieldValue = lookupFieldValues.get(0);
final String coordinateValue = DataTypeUtils.toString(fieldValue.getValue(), (String) null);
final Object coordinateValue = (fieldValue.getValue() instanceof Number || fieldValue.getValue() instanceof Boolean)
? fieldValue.getValue() : DataTypeUtils.toString(fieldValue.getValue(), (String) null);
lookupCoordinates.put(coordinateKey, coordinateValue);
}

Expand Down
Expand Up @@ -142,7 +142,7 @@ public void testCustomValidateMissingDynamicProps() throws InitializationExcepti

private static class InvalidLookupService extends AbstractControllerService implements StringLookupService {
@Override
public Optional<String> lookup(Map<String, String> coordinates) {
public Optional<String> lookup(Map<String, Object> coordinates) {
return Optional.empty();
}

Expand Down
Expand Up @@ -383,12 +383,12 @@ public Class<?> getValueType() {
}

@Override
public Optional<String> lookup(final Map<String, String> coordinates) {
if (coordinates == null) {
public Optional<String> lookup(final Map<String, Object> coordinates) {
if (coordinates == null || coordinates.get("lookup") == null) {
return Optional.empty();
}

final String key = coordinates.get("lookup");
final String key = (String)coordinates.get("lookup");
if (key == null) {
return Optional.empty();
}
Expand All @@ -415,12 +415,12 @@ public Class<?> getValueType() {
}

@Override
public Optional<Record> lookup(final Map<String, String> coordinates) {
if (coordinates == null) {
public Optional<Record> lookup(final Map<String, Object> coordinates) {
if (coordinates == null || coordinates.get("lookup") == null) {
return Optional.empty();
}

final String key = coordinates.get("lookup");
final String key = (String)coordinates.get("lookup");
if (key == null) {
return Optional.empty();
}
Expand Down
Expand Up @@ -114,8 +114,12 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
}

@Override
public Optional<Record> lookup(Map<String, String> coordinates) throws LookupFailureException {
final String rowKey = coordinates.get(ROW_KEY_KEY);
public Optional<Record> lookup(Map<String, Object> coordinates) throws LookupFailureException {
if (coordinates.get(ROW_KEY_KEY) == null) {
return Optional.empty();
}

final String rowKey = coordinates.get(ROW_KEY_KEY).toString();
if (StringUtils.isBlank(rowKey)) {
return Optional.empty();
}
Expand Down
Expand Up @@ -88,7 +88,7 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro

final String rowKey = context.getProperty(HBASE_ROW).getValue();

final Map<String,String> coordinates = new HashMap<>();
final Map<String,Object> coordinates = new HashMap<>();
coordinates.put(HBase_1_1_2_RecordLookupService.ROW_KEY_KEY, rowKey);

final LookupService<Record> lookupService = context.getProperty(HBASE_LOOKUP_SERVICE).asControllerService(LookupService.class);
Expand Down
Expand Up @@ -33,7 +33,7 @@ public interface LookupService<T> extends ControllerService {
*
* @throws LookupFailureException if unable to lookup a value for the given coordinates
*/
Optional<T> lookup(Map<String, String> coordinates) throws LookupFailureException;
Optional<T> lookup(Map<String, Object> coordinates) throws LookupFailureException;

/**
* @return the Class that represents the type of value that will be returned by {@link #lookup(Map)}
Expand Down
Expand Up @@ -17,24 +17,10 @@

package org.apache.nifi.lookup;

import java.util.Map;
import java.util.Optional;

import org.apache.nifi.serialization.record.Record;

public interface RecordLookupService extends LookupService<Record> {

/**
* Returns an Optional Record that corresponds to the given coordinates
*
* @param coordinates the coordinates to lookup
* @return an Optional Record that corresponds to the given coordinates
*
* @throws LookupFailureException if unable to lookup a value for the given coordinates
*/
@Override
Optional<Record> lookup(Map<String, String> coordinates) throws LookupFailureException;

@Override
default Class<?> getValueType() {
return Record.class;
Expand Down
Expand Up @@ -17,22 +17,8 @@

package org.apache.nifi.lookup;

import java.util.Map;
import java.util.Optional;

public interface StringLookupService extends LookupService<String> {

/**
* Returns an Optional value that corresponds to the given coordinates
*
* @param coordinates the coordinates to lookup
* @return an Optional String that represents the value for the given coordinates
*
* @throws LookupFailureException if unable to lookup a value for the given key
*/
@Override
Optional<String> lookup(Map<String, String> coordinates) throws LookupFailureException;

@Override
default Class<?> getValueType() {
return String.class;
Expand Down
Expand Up @@ -206,12 +206,12 @@ public void onEnabled(final ConfigurationContext context) throws InitializationE
}

@Override
public Optional<Record> lookup(final Map<String, String> coordinates) throws LookupFailureException {
public Optional<Record> lookup(final Map<String, Object> coordinates) throws LookupFailureException {
if (coordinates == null) {
return Optional.empty();
}

final String key = coordinates.get(KEY);
final String key = (String)coordinates.get(KEY);
if (StringUtils.isBlank(key)) {
return Optional.empty();
}
Expand Down
Expand Up @@ -194,12 +194,12 @@ public void onEnabled(final ConfigurationContext context) throws InitializationE
}

@Override
public Optional<String> lookup(final Map<String, String> coordinates) throws LookupFailureException {
public Optional<String> lookup(final Map<String, Object> coordinates) throws LookupFailureException {
if (coordinates == null) {
return Optional.empty();
}

final String key = coordinates.get(KEY);
final String key = coordinates.get(KEY).toString();
if (StringUtils.isBlank(key)) {
return Optional.empty();
}
Expand Down
Expand Up @@ -58,12 +58,12 @@ public void cacheConfiguredValues(final ConfigurationContext context) {
}

@Override
public Optional<String> lookup(final Map<String, String> coordinates) {
public Optional<String> lookup(final Map<String, Object> coordinates) {
if (coordinates == null) {
return Optional.empty();
}

final String key = coordinates.get(KEY);
final String key = coordinates.get(KEY).toString();
if (key == null) {
return Optional.empty();
}
Expand Down
Expand Up @@ -122,12 +122,12 @@ public void onEvent(ConfigurationBuilderEvent event) {
}

@Override
public Optional<String> lookup(final Map<String, String> coordinates) throws LookupFailureException {
public Optional<String> lookup(final Map<String, Object> coordinates) throws LookupFailureException {
if (coordinates == null) {
return Optional.empty();
}

final String key = coordinates.get(KEY);
final String key = coordinates.get(KEY).toString();
if (StringUtils.isBlank(key)) {
return Optional.empty();
}
Expand Down
Expand Up @@ -189,7 +189,7 @@ public Set<String> getRequiredKeys() {
}

@Override
public Optional<Record> lookup(final Map<String, String> coordinates) throws LookupFailureException {
public Optional<Record> lookup(final Map<String, Object> coordinates) throws LookupFailureException {
if (coordinates == null) {
return Optional.empty();
}
Expand Down Expand Up @@ -237,12 +237,13 @@ public Optional<Record> lookup(final Map<String, String> coordinates) throws Loo
}
}

private Optional<Record> doLookup(final DatabaseReader databaseReader, final Map<String, String> coordinates) throws LookupFailureException, InvalidDatabaseException {
final String ipAddress = coordinates.get(IP_KEY);
if (ipAddress == null) {
private Optional<Record> doLookup(final DatabaseReader databaseReader, final Map<String, Object> coordinates) throws LookupFailureException, InvalidDatabaseException {
if (coordinates.get(IP_KEY) == null) {
return Optional.empty();
}

final String ipAddress = coordinates.get(IP_KEY).toString();

final InetAddress inetAddress;
try {
inetAddress = InetAddress.getByName(ipAddress);
Expand Down
Expand Up @@ -74,7 +74,7 @@ public class MongoDBLookupService extends MongoDBControllerService implements Lo
}

@Override
public Optional<Object> lookup(Map<String, String> coordinates) throws LookupFailureException {
public Optional<Object> lookup(Map<String, Object> coordinates) throws LookupFailureException {
Map<String, Object> clean = new HashMap<>();
clean.putAll(coordinates);
Document query = new Document(clean);
Expand Down
Expand Up @@ -71,7 +71,7 @@ public void testLookupSingle() throws Exception {
Document document = service.convertJson("{ \"uuid\": \"x-y-z\", \"message\": \"Hello, world\" }");
service.insert(document);

Map<String, String> criteria = new HashMap<>();
Map<String, Object> criteria = new HashMap<>();
criteria.put("uuid", "x-y-z");
Optional result = service.lookup(criteria);

Expand Down Expand Up @@ -99,7 +99,7 @@ public void testLookupRecord() throws Exception {
Document document = service.convertJson("{ \"uuid\": \"x-y-z\", \"message\": \"Hello, world\" }");
service.insert(document);

Map<String, String> criteria = new HashMap<>();
Map<String, Object> criteria = new HashMap<>();
criteria.put("uuid", "x-y-z");
Optional result = service.lookup(criteria);

Expand Down Expand Up @@ -129,7 +129,7 @@ public void testServiceParameters() throws Exception {
Document document = service.convertJson("{ \"uuid\": \"x-y-z\", \"message\": \"Hello, world\" }");
service.insert(document);

Map<String, String> criteria = new HashMap<>();
Map<String, Object> criteria = new HashMap<>();
criteria.put("uuid", "x-y-z");

boolean error = false;
Expand Down