Skip to content

Commit

Permalink
NIFI-7375: Fixed a bug that caused Provenance Events not to show up i…
Browse files Browse the repository at this point in the history
…n specific situations when clicking View Provenance for a Processor.

- Added System-level tests for Provenance repository to reproduce behavior.
- Added a Provenance Client to the CLI, which is necessary for System-level tests.
- Added small additional configuration for Provenance repository to simplify development of system tests
- Minor improvements to system tests (such as ability to destroy environment between tests) needed for Provenance repository based system tests
  • Loading branch information
markap14 committed Apr 20, 2020
1 parent 99e69f0 commit 173d9ee
Show file tree
Hide file tree
Showing 22 changed files with 615 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class StandardQueryResult implements QueryResult, ProgressiveResult {
private final Lock writeLock = rwLock.writeLock();
// guarded by writeLock
private final SortedSet<ProvenanceEventRecord> matchingRecords = new TreeSet<>(new EventIdComparator());
private long hitCount = 0L;
private int numCompletedSteps = 0;
private Date expirationDate;
private String error;
Expand Down Expand Up @@ -163,6 +164,7 @@ public void update(final Collection<ProvenanceEventRecord> newEvents, final long
}

this.matchingRecords.addAll(newEvents);
hitCount += totalHits;

// If we've added more records than the query's max, then remove the trailing elements.
// We do this, rather than avoiding the addition of the elements because we want to choose
Expand All @@ -188,10 +190,12 @@ public void update(final Collection<ProvenanceEventRecord> newEvents, final long
queryComplete = true;

if (numCompletedSteps >= numSteps) {
logger.info("Completed {} comprised of {} steps in {} millis", query, numSteps, queryTime);
logger.info("Completed {} comprised of {} steps in {} millis. Index found {} hits. Read {} events from Event Files.",
query, numSteps, queryTime, hitCount, matchingRecords.size());
} else {
logger.info("Completed {} comprised of {} steps in {} millis (only completed {} steps because the maximum number of results was reached)",
query, numSteps, queryTime, numCompletedSteps);
logger.info("Completed {} comprised of {} steps in {} millis. Index found {} hits. Read {} events from Event Files. "
+ "Only completed {} steps because the maximum number of results was reached.",
query, numSteps, queryTime, hitCount, matchingRecords.size(), numCompletedSteps);
}
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public abstract class NiFiProperties {
public static final String PROVENANCE_MAX_STORAGE_SIZE = "nifi.provenance.repository.max.storage.size";
public static final String PROVENANCE_ROLLOVER_TIME = "nifi.provenance.repository.rollover.time";
public static final String PROVENANCE_ROLLOVER_SIZE = "nifi.provenance.repository.rollover.size";
public static final String PROVENANCE_ROLLOVER_EVENT_COUNT = "nifi.provenance.repository.rollover.events";
public static final String PROVENANCE_QUERY_THREAD_POOL_SIZE = "nifi.provenance.repository.query.threads";
public static final String PROVENANCE_INDEX_THREAD_POOL_SIZE = "nifi.provenance.repository.index.threads";
public static final String PROVENANCE_COMPRESS_ON_ROLLOVER = "nifi.provenance.repository.compress.on.rollover";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class RepositoryConfiguration {

public static final String CONCURRENT_MERGE_THREADS = "nifi.provenance.repository.concurrent.merge.threads";
public static final String WARM_CACHE_FREQUENCY = "nifi.provenance.repository.warm.cache.frequency";
public static final String MAINTENACE_FREQUENCY = "nifi.provenance.repository.maintenance.frequency";

private final Map<String, File> storageDirectories = new LinkedHashMap<>();
private long recordLifeMillis = TimeUnit.MILLISECONDS.convert(24, TimeUnit.HOURS);
Expand All @@ -51,6 +52,7 @@ public class RepositoryConfiguration {
private int compressionBlockBytes = 1024 * 1024;
private int maxAttributeChars = 65536;
private int debugFrequency = 1_000_000;
private long maintenanceFrequencyMillis = TimeUnit.MINUTES.toMillis(1L);

// TODO: Delegaate to RepositoryEncryptionConfiguration in NIFI-6617
private Map<String, String> encryptionKeys;
Expand Down Expand Up @@ -416,6 +418,14 @@ public void setDebugFrequency(int debugFrequency) {
this.debugFrequency = debugFrequency;
}

public long getMaintenanceFrequency(final TimeUnit timeUnit) {
return timeUnit.convert(maintenanceFrequencyMillis, TimeUnit.MILLISECONDS);
}

public void setMaintenanceFrequency(final long period, final TimeUnit timeUnit) {
this.maintenanceFrequencyMillis = timeUnit.toMillis(period);
}


public static RepositoryConfiguration create(final NiFiProperties nifiProperties) {
final Map<String, Path> storageDirectories = nifiProperties.getProvenanceRepositoryPaths();
Expand All @@ -426,13 +436,14 @@ public static RepositoryConfiguration create(final NiFiProperties nifiProperties
final String storageSize = nifiProperties.getProperty(NiFiProperties.PROVENANCE_MAX_STORAGE_SIZE, "1 GB");
final String rolloverTime = nifiProperties.getProperty(NiFiProperties.PROVENANCE_ROLLOVER_TIME, "5 mins");
final String rolloverSize = nifiProperties.getProperty(NiFiProperties.PROVENANCE_ROLLOVER_SIZE, "100 MB");
final int rolloverEventCount = nifiProperties.getIntegerProperty(NiFiProperties.PROVENANCE_ROLLOVER_EVENT_COUNT, Integer.MAX_VALUE);
final String shardSize = nifiProperties.getProperty(NiFiProperties.PROVENANCE_INDEX_SHARD_SIZE, "500 MB");
final int queryThreads = nifiProperties.getIntegerProperty(NiFiProperties.PROVENANCE_QUERY_THREAD_POOL_SIZE, 2);
final int indexThreads = nifiProperties.getIntegerProperty(NiFiProperties.PROVENANCE_INDEX_THREAD_POOL_SIZE, 2);
final int journalCount = nifiProperties.getIntegerProperty(NiFiProperties.PROVENANCE_JOURNAL_COUNT, 16);
final int concurrentMergeThreads = nifiProperties.getIntegerProperty(CONCURRENT_MERGE_THREADS, 2);
final String warmCacheFrequency = nifiProperties.getProperty(WARM_CACHE_FREQUENCY);

final String maintenanceFrequency = nifiProperties.getProperty(MAINTENACE_FREQUENCY);
final long storageMillis = FormatUtils.getTimeDuration(storageTime, TimeUnit.MILLISECONDS);
final long maxStorageBytes = DataUnit.parseDataSize(storageSize, DataUnit.B).longValue();
final long rolloverMillis = FormatUtils.getTimeDuration(rolloverTime, TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -475,6 +486,7 @@ public static RepositoryConfiguration create(final NiFiProperties nifiProperties
config.setSearchableFields(searchableFields);
config.setSearchableAttributes(searchableAttributes);
config.setMaxEventFileCapacity(rolloverBytes);
config.setMaxEventFileCount(rolloverEventCount);
config.setMaxEventFileLife(rolloverMillis, TimeUnit.MILLISECONDS);
config.setMaxRecordLife(storageMillis, TimeUnit.MILLISECONDS);
config.setMaxStorageCapacity(maxStorageBytes);
Expand All @@ -490,6 +502,10 @@ public static RepositoryConfiguration create(final NiFiProperties nifiProperties
if (shardSize != null) {
config.setDesiredIndexSize(DataUnit.parseDataSize(shardSize, DataUnit.B).longValue());
}
if (maintenanceFrequency != null && !maintenanceFrequency.trim().equals("")) {
final long millis = FormatUtils.getTimeDuration(maintenanceFrequency.trim(), TimeUnit.MILLISECONDS);
config.setMaintenanceFrequency(millis, TimeUnit.MILLISECONDS);
}

config.setAlwaysSync(alwaysSync);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,9 @@ public Optional<List<Long>> evaluate(final Query query) {
return Optional.of(eventIds);
}

@Override
public String toString() {
return "Latest Events Per Processor";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,8 @@ public Optional<List<Long>> evaluate(final Query query) {
}
}

@Override
public String toString() {
return "Most Recent Events Query";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -606,11 +606,14 @@ public QuerySubmission submitQuery(final Query query, final EventAuthorizer auth
querySubmissionMap.put(query.getIdentifier(), submission);

final List<Long> eventIds = eventIdListOption.get();
logger.debug("Cached Query {} produced {} Event IDs for {}: {}", cachedQuery, eventIds.size(), query, eventIds);

queryExecutor.submit(() -> {
List<ProvenanceEventRecord> events;
try {
events = eventStore.getEvents(eventIds, authorizer, EventTransformer.EMPTY_TRANSFORMER);
logger.debug("Retrieved {} of {} Events from Event Store", events.size(), eventIds.size());

submission.getResult().update(events, eventIds.size());
} catch (final Exception e) {
submission.getResult().setError("Failed to retrieve Provenance Events from store; see logs for more details");
Expand Down Expand Up @@ -639,7 +642,7 @@ public QuerySubmission submitQuery(final Query query, final EventAuthorizer auth
querySubmissionMap.put(query.getIdentifier(), submission);

final org.apache.lucene.search.Query luceneQuery = LuceneUtil.convertQuery(query);
logger.debug("Submitting query {} with identifier {} against index directories {}", luceneQuery, query.getIdentifier(), indexDirectories);
logger.debug("Submitting query {} with identifier {} against {} index directories: {}", luceneQuery, query.getIdentifier(), indexDirectories.size(), indexDirectories);

if (indexDirectories.isEmpty()) {
submission.getResult().update(Collections.emptyList(), 0L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TopDocs;
import org.apache.nifi.provenance.ProgressiveResult;
import org.apache.nifi.provenance.ProvenanceEventRecord;
Expand Down Expand Up @@ -105,7 +107,7 @@ public void run() {

try {
final long borrowMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - borrowStart);
logger.debug("Borrowing index searcher for {} took {} ms", indexDir, borrowMillis);
logger.trace("Borrowing index searcher for {} took {} ms", indexDir, borrowMillis);
final long startNanos = System.nanoTime();

// If max number of results are retrieved, do not bother querying lucene
Expand All @@ -124,7 +126,11 @@ public void run() {
final IndexReader indexReader = searcher.getIndexSearcher().getIndexReader();
final TopDocs topDocs;
try {
topDocs = searcher.getIndexSearcher().search(query, maxResults);

// Sort based on document id, descending. This gives us most recent events first.
final Sort sort = new Sort(new SortField(null, SortField.Type.DOC, true));

topDocs = searcher.getIndexSearcher().search(query, maxResults, sort);
} catch (final Exception e) {
logger.error("Failed to query Lucene for index " + indexDir, e);
queryResult.setError("Failed to query Lucene for index " + indexDir + " due to " + e);
Expand Down Expand Up @@ -189,7 +195,7 @@ private Tuple<List<ProvenanceEventRecord>, Long> readDocuments(final TopDocs top

final long endConvert = System.nanoTime();
final long ms = TimeUnit.NANOSECONDS.toMillis(endConvert - start);
logger.debug("Converting documents took {} ms", ms);
logger.trace("Converting documents took {} ms", ms);

List<ProvenanceEventRecord> events;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ public PartitionedEventStore(final RepositoryConfiguration config, final EventRe
@Override
public void initialize() throws IOException {
maintenanceExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Provenance Repository Maintenance"));
maintenanceExecutor.scheduleWithFixedDelay(() -> performMaintenance(), 1, 1, TimeUnit.MINUTES);
final long maintenanceMillis = repoConfig.getMaintenanceFrequency(TimeUnit.MILLISECONDS);
maintenanceExecutor.scheduleWithFixedDelay(this::performMaintenance, maintenanceMillis, maintenanceMillis, TimeUnit.MILLISECONDS);

for (final EventStorePartition partition : getPartitions()) {
partition.initialize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public StorageResult addEvents(final Iterable<ProvenanceEventRecord> events) thr
}

// Claim a Record Writer Lease so that we have a writer to persist the events to
RecordWriterLease lease = null;
RecordWriterLease lease;
while (true) {
lease = getLease();
if (lease.tryClaim()) {
Expand All @@ -185,7 +185,6 @@ public StorageResult addEvents(final Iterable<ProvenanceEventRecord> events) thr
final RolloverState rolloverState = lease.getRolloverState();
if (rolloverState.isRollover()) {
final boolean success = tryRollover(lease);

if (success) {
logger.info("Successfully rolled over Event Writer for {} due to {}", this, rolloverState);
}
Expand Down Expand Up @@ -476,9 +475,16 @@ private Optional<File> getPathForEventId(final long id) {
public void purgeOldEvents(final long olderThan, final TimeUnit unit) {
final long timeCutoff = System.currentTimeMillis() - unit.toMillis(olderThan);

getEventFilesFromDisk().filter(file -> file.lastModified() < timeCutoff)
final List<File> removed = getEventFilesFromDisk().filter(file -> file.lastModified() < timeCutoff)
.sorted(DirectoryUtils.SMALLEST_ID_FIRST)
.forEach(this::delete);
.filter(this::delete)
.collect(Collectors.toList());

if (removed.isEmpty()) {
logger.debug("No Provenance Event files that exceed time-based threshold of {} {}", olderThan, unit);
} else {
logger.info("Purged {} Provenance Event files from Provenance Repository because the events were older than {} {}: {}", removed.size(), olderThan, unit, removed);
}
}

private File getActiveEventFile() {
Expand All @@ -489,20 +495,27 @@ private File getActiveEventFile() {
@Override
public long purgeOldestEvents() {
final List<File> eventFiles = getEventFilesFromDisk().sorted(DirectoryUtils.SMALLEST_ID_FIRST).collect(Collectors.toList());
if (eventFiles.isEmpty()) {
if (eventFiles.size() < 2) {
// If there are no Event Files, there's nothing to do. If there is exactly 1 Event File, it means that the only Event File
// that exists is the Active Event File, which we are writing to, so we don't want to remove it either.
return 0L;
}

final File currentFile = getActiveEventFile();
if (currentFile == null) {
logger.debug("There is currently no Active Event File for {}. Will not purge oldest events until the Active Event File has been established.", this);
return 0L;
}

for (final File eventFile : eventFiles) {
if (eventFile.equals(currentFile)) {
continue;
break;
}

final long fileSize = eventFile.length();

if (delete(eventFile)) {
logger.debug("{} Deleted {} event file ({}) due to storage limits", this, eventFile, FormatUtils.formatDataSize(fileSize));
logger.info("{} Deleted {} event file ({}) due to storage limits", this, eventFile, FormatUtils.formatDataSize(fileSize));
return fileSize;
} else {
logger.warn("{} Failed to delete oldest event file {}. This file should be cleaned up manually.", this, eventFile);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.tests.system;

import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;

public class TerminateFlowFile extends AbstractProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}

session.remove(flowFile);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
# limitations under the License.

org.apache.nifi.processors.tests.system.CountEvents
org.apache.nifi.processors.tests.system.FakeProcessor
org.apache.nifi.processors.tests.system.FakeDynamicPropertiesProcessor
org.apache.nifi.processors.tests.system.GenerateFlowFile
org.apache.nifi.processors.tests.system.Sleep
org.apache.nifi.processors.tests.system.TerminateFlowFile
org.apache.nifi.processors.tests.system.ValidateFileExists
org.apache.nifi.processors.tests.system.FakeProcessor
org.apache.nifi.processors.tests.system.FakeDynamicPropertiesProcessor
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,24 @@

import java.io.File;
import java.io.FileNotFoundException;
import java.util.HashMap;
import java.util.Map;

public class InstanceConfiguration {
private final File bootstrapConfigFile;
private final File instanceDirectory;
private final File flowXmlGz;
private final File stateDirectory;
private final boolean autoStart;
private final Map<String, String> nifiPropertiesOverrides;

private InstanceConfiguration(Builder builder) {
this.bootstrapConfigFile = builder.bootstrapConfigFile;
this.instanceDirectory = builder.instanceDirectory;
this.flowXmlGz = builder.flowXmlGz;
this.stateDirectory = builder.stateDirectory;
this.autoStart = builder.autoStart;
this.nifiPropertiesOverrides = builder.nifiPropertiesOverrides;
}

public File getBootstrapConfigFile() {
Expand All @@ -54,12 +58,26 @@ public boolean isAutoStart() {
return autoStart;
}

public Map<String, String> getNifiPropertiesOverrides() {
return nifiPropertiesOverrides;
}

public static class Builder {
private File bootstrapConfigFile;
private File instanceDirectory;
private File flowXmlGz;
private File stateDirectory;
private boolean autoStart = true;
private final Map<String, String> nifiPropertiesOverrides = new HashMap<>();

public Builder overrideNifiProperties(final Map<String, String> overrides) {
nifiPropertiesOverrides.clear();
if (overrides != null) {
nifiPropertiesOverrides.putAll(overrides);
}

return this;
}

public Builder bootstrapConfig(final File configFile) {
if (!configFile.exists()) {
Expand Down
Loading

0 comments on commit 173d9ee

Please sign in to comment.