Skip to content

Commit

Permalink
NIFI-4752: Addressed issue with some event types having potentially t…
Browse files Browse the repository at this point in the history
…he wrong FlowFile UUID listed (could have child UUID when it's supposed to have parent flowfile UUID). In testing fix, also found an issue with Search threads not being daemon and Re-Index threads not propertly being shutdown so addressed those as well.

This closes #2390.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
  • Loading branch information
markap14 authored and ijokarumawak committed Jan 10, 2018
1 parent 10e3b14 commit 6153fb6
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class SimpleIndexManager implements IndexManager {

public SimpleIndexManager(final RepositoryConfiguration repoConfig) {
this.repoConfig = repoConfig;
this.searchExecutor = Executors.newFixedThreadPool(repoConfig.getQueryThreadPoolSize(), new NamedThreadFactory("Search Lucene Index"));
this.searchExecutor = Executors.newFixedThreadPool(repoConfig.getQueryThreadPoolSize(), new NamedThreadFactory("Search Lucene Index", true));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,11 +245,14 @@ public static StandardProvenanceEventRecord getEvent(final Record record, final
final Map<String, String> previousAttributes = truncateAttributes((Map<String, String>) record.getFieldValue(EventFieldNames.PREVIOUS_ATTRIBUTES), maxAttributeLength);
final Map<String, String> updatedAttributes = truncateAttributes((Map<String, String>) record.getFieldValue(EventFieldNames.UPDATED_ATTRIBUTES), maxAttributeLength);

final List<String> childUuids = (List<String>) record.getFieldValue(EventFieldNames.CHILD_UUIDS);
final List<String> parentUuids = (List<String>) record.getFieldValue(EventFieldNames.PARENT_UUIDS);

final StandardProvenanceEventRecord.Builder builder = new StandardProvenanceEventRecord.Builder();
builder.setAlternateIdentifierUri((String) record.getFieldValue(EventFieldNames.ALTERNATE_IDENTIFIER));
builder.setChildUuids((List<String>) record.getFieldValue(EventFieldNames.CHILD_UUIDS));
builder.setChildUuids(childUuids);
builder.setDetails((String) record.getFieldValue(EventFieldNames.EVENT_DETAILS));
builder.setParentUuids((List<String>) record.getFieldValue(EventFieldNames.PARENT_UUIDS));
builder.setParentUuids(parentUuids);
builder.setPreviousAttributes(previousAttributes);
builder.setRelationship((String) record.getFieldValue(EventFieldNames.RELATIONSHIP));
builder.setSourceSystemFlowFileIdentifier((String) record.getFieldValue(EventFieldNames.SOURCE_SYSTEM_FLOWFILE_IDENTIFIER));
Expand All @@ -263,20 +266,42 @@ public static StandardProvenanceEventRecord getEvent(final Record record, final

// Determine the event type
final Integer eventTypeOrdinal = (Integer) record.getFieldValue(EventFieldNames.EVENT_TYPE);
ProvenanceEventType eventType;
if (eventTypeOrdinal == null || eventTypeOrdinal > eventTypes.size() || eventTypeOrdinal < 0) {
builder.setEventType(ProvenanceEventType.UNKNOWN);
eventType = ProvenanceEventType.UNKNOWN;
} else {
try {
builder.setEventType(ProvenanceEventType.valueOf(eventTypes.get(eventTypeOrdinal)));
eventType = ProvenanceEventType.valueOf(eventTypes.get(eventTypeOrdinal));
} catch (final Exception e) {
builder.setEventType(ProvenanceEventType.UNKNOWN);
eventType = ProvenanceEventType.UNKNOWN;
}
}
builder.setEventType(eventType);

// Determine appropriate UUID for the event
String uuid = null;
switch (eventType) {
case CLONE:
case FORK:
case REPLAY:
if (parentUuids != null && !parentUuids.isEmpty()) {
uuid = parentUuids.get(0);
}
break;
case JOIN:
if (childUuids != null && !childUuids.isEmpty()) {
uuid = childUuids.get(0);
}
break;
}

String uuid = updatedAttributes == null ? null : updatedAttributes.get(CoreAttributes.UUID.key());
if (uuid == null) {
uuid = previousAttributes == null ? null : previousAttributes.get(CoreAttributes.UUID.key());
uuid = updatedAttributes == null ? null : updatedAttributes.get(CoreAttributes.UUID.key());
if (uuid == null) {
uuid = previousAttributes == null ? null : previousAttributes.get(CoreAttributes.UUID.key());
}
}

builder.setFlowFileUUID(uuid);

builder.setEventDuration((Integer) record.getFieldValue(EventFieldNames.EVENT_DURATION));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,8 @@ public void run() {
logger.error("Failed to re-index Provenance Events for partition " + partitionName, e);
}

executor.shutdown();

final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
final long seconds = millis / 1000L;
final long millisRemainder = millis % 1000L;
Expand Down

0 comments on commit 6153fb6

Please sign in to comment.