Skip to content

Commit

Permalink
NIFI-12855: Add more information to provenance events to facilitate f…
Browse files Browse the repository at this point in the history
…ull graph traversal

Co-authored-by: Timea Barna <timeabarna@apache.org>
  • Loading branch information
mattyb149 and timeabarna committed Mar 15, 2024
1 parent 3719fdd commit b8ae42b
Show file tree
Hide file tree
Showing 29 changed files with 619 additions and 186 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,19 @@
import java.util.Optional;
import java.util.Set;

import static java.util.Collections.EMPTY_SET;
import static java.util.Collections.emptyList;

/**
* Implementation of {@link ProvenanceRepository} that does not
* store events.
*
*/
public class NoOpProvenanceRepository implements ProvenanceRepository {
public class NoOpProvenanceRepository extends AbstractProvenanceRepository {

@Override
public void initialize(EventReporter eventReporter, Authorizer authorizer,
ProvenanceAuthorizableFactory factory, IdentifierLookup identifierLookup)
throws IOException {
ProvenanceAuthorizableFactory factory, IdentifierLookup identifierLookup)
throws IOException {

}

Expand All @@ -68,13 +67,13 @@ public ProvenanceEventRecord getEvent(final long id, final NiFiUser user) throws

@Override
public List<ProvenanceEventRecord> getEvents(long firstRecordId, int maxRecords)
throws IOException {
throws IOException {
return emptyList();
}

@Override
public List<ProvenanceEventRecord> getEvents(long firstRecordId,
int maxRecords, NiFiUser niFiUser) throws IOException {
int maxRecords, NiFiUser niFiUser) throws IOException {
return emptyList();
}

Expand Down Expand Up @@ -135,7 +134,7 @@ public List<SearchableField> getSearchableAttributes() {

@Override
public Set<String> getContainerNames() {
return EMPTY_SET;
return Set.of();
}

@Override
Expand All @@ -155,7 +154,7 @@ public long getContainerUsableSpace(String s) throws IOException {

@Override
public AsyncLineageSubmission retrieveLineageSubmission(final String lineageIdentifier,
final NiFiUser user) {
final NiFiUser user) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ public interface ProvenanceEventBuilder {
ProvenanceEventBuilder setDetails(String details);

/**
* Sets the to which the FlowFile was routed for
* Sets the relationship to which the FlowFile was routed for
* {@link ProvenanceEventType#ROUTE} events. This is valid only for
* {@link ProvenanceEventType#ROUTE} events and will be ignored for any
* other event types.
Expand All @@ -283,6 +283,13 @@ public interface ProvenanceEventBuilder {
*/
ProvenanceEventBuilder setRelationship(Relationship relationship);

/**
* Sets the IDs for the event that happened previously to this event for the given FlowFile
* @param previousEventIds The previous event IDs (usually one except for JOIN events and such)
* @return the builder
*/
ProvenanceEventBuilder setPreviousEventIds(List<Long> previousEventIds);

/**
* Populates the builder with as much information as it can from the given
* FlowFile
Expand All @@ -297,7 +304,7 @@ public interface ProvenanceEventBuilder {
* {@link ProvenanceEventRecord#getEventId()} on the
* {@link ProvenanceEventRecord} that is returned will yield
* <code>-1</code>. This is because the implementation of the Event may
* depend on the {@link ProvevenanceEventRepository} to generate the unique
* depend on the {@link ProvenanceEventRepository} to generate the unique
* identifier.
*
* @return the event
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ public interface ProvenanceEventRecord {
*/
long getEventId();

/**
* @return a unique ID for the "parent" Provenance Event, namely the one that came directly before this event
* for the given FlowFile. For source events such as CREATE, this value should be set to -1
*/
List<Long> getPreviousEventIds();

/**
* @return the time at which this Provenance Event was created, as the
* number of milliseconds since epoch
Expand Down Expand Up @@ -149,14 +155,14 @@ default boolean isRemotePortType() {
/**
* @return the UUID's of all Parent FlowFiles. This is applicable only when
* the {@link ProvenanceEventType} is of type
* {@link ProvenanceEventType#SPAWN SPAWN}
* {@link ProvenanceEventType#JOIN JOIN}
*/
List<String> getParentUuids();

/**
* @return the UUID's of all Child FlowFiles. This is applicable only when
* the {@link ProvenanceEventType} is of type
* {@link ProvenanceEventType#SPAWN SPAWN}
* {@link ProvenanceEventType#FORK FORK}
*/
List<String> getChildUuids();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,5 +93,19 @@ public interface ProvenanceEventRepository {
*/
void close() throws IOException;

/**
* Returns the previous provenance event IDs for the given FlowFile
* @param flowFileUUID the UUID of the FlowFile
* @return the previous event IDs for the given FlowFile
*/
List<Long> getPreviousEventIds(String flowFileUUID);

/**
* Updates the previous provenance event IDs for the given event
*
* @param record The record for which to update the previous event IDs
* @param previousIds the list of previous event IDs to set for the record, or null to remove
*/
void updatePreviousEventIds(ProvenanceEventRecord record, List<Long> previousIds);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.provenance;

import java.util.List;

public interface UpdateableProvenanceEventRecord extends ProvenanceEventRecord {

void setEventId(final long eventId);
void setPreviousEventIds(List<Long> previousEventIds);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,17 @@
* A Provenance Event that is used to replace another Provenance Event when authorizations
* are not granted for the original Provenance Event
*/
public class PlaceholderProvenanceEvent implements ProvenanceEventRecord {
public class PlaceholderProvenanceEvent implements UpdateableProvenanceEventRecord {
private final String componentId;
private final long eventId;
private long eventId;
private List<Long> previousEventIds;
private final long eventTime;
private final String flowFileUuid;

public PlaceholderProvenanceEvent(final ProvenanceEventRecord original) {
this.componentId = original.getComponentId();
this.eventId = original.getEventId();
this.previousEventIds = original.getPreviousEventIds();
this.eventTime = original.getEventTime();
this.flowFileUuid = original.getFlowFileUuid();
}
Expand All @@ -43,6 +45,21 @@ public long getEventId() {
return eventId;
}

@Override
public void setEventId(long eventId) {
this.eventId = eventId;
}

@Override
public List<Long> getPreviousEventIds() {
return previousEventIds;
}

@Override
public void setPreviousEventIds(List<Long> previousEventIds) {
this.previousEventIds = previousEventIds;
}

@Override
public long getEventTime() {
return eventTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,17 @@ public class SearchableFields {
public static final SearchableField SourceQueueIdentifier
= new NamedSearchableField("SourceQueueIdentifier", "sourceQueueIdentifier", "Source Queue Identifier", false, SearchableFieldType.STRING);

public static final SearchableField PreviousEventIdentifiers
= new NamedSearchableField("PreviousEventIdentifiers", "previousEventIdentifiers", "Previous Event Identifiers", false, SearchableFieldType.LONG);

private static final Map<String, SearchableField> standardFields;

static {
final SearchableField[] searchableFields = new SearchableField[]{
EventTime, FlowFileUUID, Filename, EventType, TransitURI,
ComponentID, AlternateIdentifierURI, FileSize, Relationship, Details,
LineageStartDate, LineageIdentifier, ContentClaimSection, ContentClaimContainer, ContentClaimIdentifier,
ContentClaimOffset, SourceQueueIdentifier};
ContentClaimOffset, SourceQueueIdentifier, PreviousEventIdentifiers};

final Map<String, SearchableField> fields = new HashMap<>();
for (final SearchableField field : searchableFields) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
/**
* Holder for provenance relevant information
*/
public class StandardProvenanceEventRecord implements ProvenanceEventRecord {
public class StandardProvenanceEventRecord implements UpdateableProvenanceEventRecord {

private final long eventTime;
private final long entryDate;
Expand Down Expand Up @@ -68,6 +68,7 @@ public class StandardProvenanceEventRecord implements ProvenanceEventRecord {
private final Map<String, String> updatedAttributes;

private volatile long eventId = -1L;
private volatile List<Long> previousEventIds;

StandardProvenanceEventRecord(final Builder builder) {
this.eventTime = builder.eventTime;
Expand Down Expand Up @@ -108,6 +109,7 @@ public class StandardProvenanceEventRecord implements ProvenanceEventRecord {
if (builder.eventId != null) {
eventId = builder.eventId;
}
previousEventIds = builder.previousEventIds;
}

public static StandardProvenanceEventRecord copy(StandardProvenanceEventRecord other) {
Expand All @@ -123,7 +125,8 @@ public long getStorageByteOffset() {
return storageByteOffset;
}

void setEventId(final long eventId) {
@Override
public void setEventId(final long eventId) {
this.eventId = eventId;
}

Expand All @@ -132,6 +135,15 @@ public long getEventId() {
return eventId;
}

@Override
public List<Long> getPreviousEventIds() {
return previousEventIds;
}

public void setPreviousEventIds(List<Long> previousEventIds) {
this.previousEventIds = previousEventIds;
}

@Override
public long getEventTime() {
return eventTime;
Expand Down Expand Up @@ -304,7 +316,8 @@ public int hashCode() {
}

return -37423 + 3 * componentId.hashCode() + (transitUri == null ? 0 : 41 * transitUri.hashCode())
+ (relationship == null ? 0 : 47 * relationship.hashCode()) + 44 * eventTypeCode
//+ (relationship == null ? 0 : 47 * relationship.hashCode())
+ 44 * eventTypeCode
+ 47 * getChildUuids().hashCode() + 47 * getParentUuids().hashCode();
}

Expand Down Expand Up @@ -351,10 +364,6 @@ public boolean equals(final Object obj) {
return false;
}

if (different(relationship, other.relationship)) {
return false;
}

return !(eventType == ProvenanceEventType.REPLAY && eventTime != other.getEventTime());
}

Expand All @@ -374,15 +383,15 @@ private boolean different(final List<String> a, final List<String> b) {
return false;
}

if (a == null && b != null && !b.isEmpty()) {
if (a == null && !b.isEmpty()) {
return true;
}

if (a == null && b.isEmpty()) {
if (a == null) {
return false;
}

if (a != null && !a.isEmpty() && b == null) {
if (!a.isEmpty() && b == null) {
return true;
}

Expand Down Expand Up @@ -413,11 +422,13 @@ private boolean different(final List<String> a, final List<String> b) {
public String toString() {
return "ProvenanceEventRecord ["
+ "eventId=" + eventId
+ ", previousEventIds=" + previousEventIds
+ ", eventType=" + eventType
+ ", eventTime=" + new Date(eventTime)
+ ", uuid=" + uuid
+ ", fileSize=" + contentSize
+ ", componentId=" + componentId
+ ", componentType=" + componentType
+ ", transitUri=" + transitUri
+ ", sourceSystemFlowFileIdentifier=" + sourceSystemFlowFileIdentifier
+ ", parentUuids=" + parentUuids
Expand Down Expand Up @@ -461,6 +472,7 @@ public static class Builder implements ProvenanceEventBuilder {
private long eventDuration = -1L;
private String storageFilename;
private Long eventId;
private List<Long> previousEventIds;

private String contentClaimSection;
private String contentClaimContainer;
Expand Down Expand Up @@ -519,6 +531,7 @@ public Builder fromEvent(final ProvenanceEventRecord event) {
storageFilename = standardProvEvent.storageFilename;
}

previousEventIds = event.getPreviousEventIds();
return this;
}

Expand Down Expand Up @@ -572,6 +585,10 @@ public ProvenanceEventBuilder copy() {
copy.storageByteOffset = storageByteOffset;
copy.storageFilename = storageFilename;

if (previousEventIds != null) {
copy.previousEventIds = previousEventIds;
}

return copy;
}

Expand Down Expand Up @@ -746,6 +763,12 @@ public Builder setRelationship(Relationship relationship) {
return this;
}

@Override
public ProvenanceEventBuilder setPreviousEventIds(List<Long> previousEventIds) {
this.previousEventIds = previousEventIds;
return this;
}

public Builder setRelationship(final String relationship) {
this.relationship = relationship;
return this;
Expand Down

0 comments on commit b8ae42b

Please sign in to comment.