Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-7375: Fixed a bug that caused Provenance Events not to show up i… #4218

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class StandardQueryResult implements QueryResult, ProgressiveResult {
private final Lock writeLock = rwLock.writeLock();
// guarded by writeLock
private final SortedSet<ProvenanceEventRecord> matchingRecords = new TreeSet<>(new EventIdComparator());
private long hitCount = 0L;
private int numCompletedSteps = 0;
private Date expirationDate;
private String error;
Expand Down Expand Up @@ -163,6 +164,7 @@ public void update(final Collection<ProvenanceEventRecord> newEvents, final long
}

this.matchingRecords.addAll(newEvents);
hitCount += totalHits;

// If we've added more records than the query's max, then remove the trailing elements.
// We do this, rather than avoiding the addition of the elements because we want to choose
Expand All @@ -188,10 +190,12 @@ public void update(final Collection<ProvenanceEventRecord> newEvents, final long
queryComplete = true;

if (numCompletedSteps >= numSteps) {
logger.info("Completed {} comprised of {} steps in {} millis", query, numSteps, queryTime);
logger.info("Completed {} comprised of {} steps in {} millis. Index found {} hits. Read {} events from Event Files.",
query, numSteps, queryTime, hitCount, matchingRecords.size());
} else {
logger.info("Completed {} comprised of {} steps in {} millis (only completed {} steps because the maximum number of results was reached)",
query, numSteps, queryTime, numCompletedSteps);
logger.info("Completed {} comprised of {} steps in {} millis. Index found {} hits. Read {} events from Event Files. "
+ "Only completed {} steps because the maximum number of results was reached.",
query, numSteps, queryTime, hitCount, matchingRecords.size(), numCompletedSteps);
}
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public abstract class NiFiProperties {
public static final String PROVENANCE_MAX_STORAGE_SIZE = "nifi.provenance.repository.max.storage.size";
public static final String PROVENANCE_ROLLOVER_TIME = "nifi.provenance.repository.rollover.time";
public static final String PROVENANCE_ROLLOVER_SIZE = "nifi.provenance.repository.rollover.size";
public static final String PROVENANCE_ROLLOVER_EVENT_COUNT = "nifi.provenance.repository.rollover.events";
public static final String PROVENANCE_QUERY_THREAD_POOL_SIZE = "nifi.provenance.repository.query.threads";
public static final String PROVENANCE_INDEX_THREAD_POOL_SIZE = "nifi.provenance.repository.index.threads";
public static final String PROVENANCE_COMPRESS_ON_ROLLOVER = "nifi.provenance.repository.compress.on.rollover";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class RepositoryConfiguration {

public static final String CONCURRENT_MERGE_THREADS = "nifi.provenance.repository.concurrent.merge.threads";
public static final String WARM_CACHE_FREQUENCY = "nifi.provenance.repository.warm.cache.frequency";
public static final String MAINTENACE_FREQUENCY = "nifi.provenance.repository.maintenance.frequency";

private final Map<String, File> storageDirectories = new LinkedHashMap<>();
private long recordLifeMillis = TimeUnit.MILLISECONDS.convert(24, TimeUnit.HOURS);
Expand All @@ -51,6 +52,7 @@ public class RepositoryConfiguration {
private int compressionBlockBytes = 1024 * 1024;
private int maxAttributeChars = 65536;
private int debugFrequency = 1_000_000;
private long maintenanceFrequencyMillis = TimeUnit.MINUTES.toMillis(1L);

// TODO: Delegaate to RepositoryEncryptionConfiguration in NIFI-6617
private Map<String, String> encryptionKeys;
Expand Down Expand Up @@ -416,6 +418,14 @@ public void setDebugFrequency(int debugFrequency) {
this.debugFrequency = debugFrequency;
}

public long getMaintenanceFrequency(final TimeUnit timeUnit) {
return timeUnit.convert(maintenanceFrequencyMillis, TimeUnit.MILLISECONDS);
}

public void setMaintenanceFrequency(final long period, final TimeUnit timeUnit) {
this.maintenanceFrequencyMillis = timeUnit.toMillis(period);
}


public static RepositoryConfiguration create(final NiFiProperties nifiProperties) {
final Map<String, Path> storageDirectories = nifiProperties.getProvenanceRepositoryPaths();
Expand All @@ -426,13 +436,14 @@ public static RepositoryConfiguration create(final NiFiProperties nifiProperties
final String storageSize = nifiProperties.getProperty(NiFiProperties.PROVENANCE_MAX_STORAGE_SIZE, "1 GB");
final String rolloverTime = nifiProperties.getProperty(NiFiProperties.PROVENANCE_ROLLOVER_TIME, "5 mins");
final String rolloverSize = nifiProperties.getProperty(NiFiProperties.PROVENANCE_ROLLOVER_SIZE, "100 MB");
final int rolloverEventCount = nifiProperties.getIntegerProperty(NiFiProperties.PROVENANCE_ROLLOVER_EVENT_COUNT, Integer.MAX_VALUE);
final String shardSize = nifiProperties.getProperty(NiFiProperties.PROVENANCE_INDEX_SHARD_SIZE, "500 MB");
final int queryThreads = nifiProperties.getIntegerProperty(NiFiProperties.PROVENANCE_QUERY_THREAD_POOL_SIZE, 2);
final int indexThreads = nifiProperties.getIntegerProperty(NiFiProperties.PROVENANCE_INDEX_THREAD_POOL_SIZE, 2);
final int journalCount = nifiProperties.getIntegerProperty(NiFiProperties.PROVENANCE_JOURNAL_COUNT, 16);
final int concurrentMergeThreads = nifiProperties.getIntegerProperty(CONCURRENT_MERGE_THREADS, 2);
final String warmCacheFrequency = nifiProperties.getProperty(WARM_CACHE_FREQUENCY);

final String maintenanceFrequency = nifiProperties.getProperty(MAINTENACE_FREQUENCY);
final long storageMillis = FormatUtils.getTimeDuration(storageTime, TimeUnit.MILLISECONDS);
final long maxStorageBytes = DataUnit.parseDataSize(storageSize, DataUnit.B).longValue();
final long rolloverMillis = FormatUtils.getTimeDuration(rolloverTime, TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -475,6 +486,7 @@ public static RepositoryConfiguration create(final NiFiProperties nifiProperties
config.setSearchableFields(searchableFields);
config.setSearchableAttributes(searchableAttributes);
config.setMaxEventFileCapacity(rolloverBytes);
config.setMaxEventFileCount(rolloverEventCount);
config.setMaxEventFileLife(rolloverMillis, TimeUnit.MILLISECONDS);
config.setMaxRecordLife(storageMillis, TimeUnit.MILLISECONDS);
config.setMaxStorageCapacity(maxStorageBytes);
Expand All @@ -490,6 +502,10 @@ public static RepositoryConfiguration create(final NiFiProperties nifiProperties
if (shardSize != null) {
config.setDesiredIndexSize(DataUnit.parseDataSize(shardSize, DataUnit.B).longValue());
}
if (maintenanceFrequency != null && !maintenanceFrequency.trim().equals("")) {
final long millis = FormatUtils.getTimeDuration(maintenanceFrequency.trim(), TimeUnit.MILLISECONDS);
config.setMaintenanceFrequency(millis, TimeUnit.MILLISECONDS);
}

config.setAlwaysSync(alwaysSync);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,9 @@ public Optional<List<Long>> evaluate(final Query query) {
return Optional.of(eventIds);
}

@Override
public String toString() {
return "Latest Events Per Processor";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,8 @@ public Optional<List<Long>> evaluate(final Query query) {
}
}

@Override
public String toString() {
return "Most Recent Events Query";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -606,11 +606,14 @@ public QuerySubmission submitQuery(final Query query, final EventAuthorizer auth
querySubmissionMap.put(query.getIdentifier(), submission);

final List<Long> eventIds = eventIdListOption.get();
logger.debug("Cached Query {} produced {} Event IDs for {}: {}", cachedQuery, eventIds.size(), query, eventIds);

queryExecutor.submit(() -> {
List<ProvenanceEventRecord> events;
try {
events = eventStore.getEvents(eventIds, authorizer, EventTransformer.EMPTY_TRANSFORMER);
logger.debug("Retrieved {} of {} Events from Event Store", events.size(), eventIds.size());

submission.getResult().update(events, eventIds.size());
} catch (final Exception e) {
submission.getResult().setError("Failed to retrieve Provenance Events from store; see logs for more details");
Expand Down Expand Up @@ -639,7 +642,7 @@ public QuerySubmission submitQuery(final Query query, final EventAuthorizer auth
querySubmissionMap.put(query.getIdentifier(), submission);

final org.apache.lucene.search.Query luceneQuery = LuceneUtil.convertQuery(query);
logger.debug("Submitting query {} with identifier {} against index directories {}", luceneQuery, query.getIdentifier(), indexDirectories);
logger.debug("Submitting query {} with identifier {} against {} index directories: {}", luceneQuery, query.getIdentifier(), indexDirectories.size(), indexDirectories);

if (indexDirectories.isEmpty()) {
submission.getResult().update(Collections.emptyList(), 0L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TopDocs;
import org.apache.nifi.provenance.ProgressiveResult;
import org.apache.nifi.provenance.ProvenanceEventRecord;
Expand Down Expand Up @@ -105,7 +107,7 @@ public void run() {

try {
final long borrowMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - borrowStart);
logger.debug("Borrowing index searcher for {} took {} ms", indexDir, borrowMillis);
logger.trace("Borrowing index searcher for {} took {} ms", indexDir, borrowMillis);
final long startNanos = System.nanoTime();

// If max number of results are retrieved, do not bother querying lucene
Expand All @@ -124,7 +126,11 @@ public void run() {
final IndexReader indexReader = searcher.getIndexSearcher().getIndexReader();
final TopDocs topDocs;
try {
topDocs = searcher.getIndexSearcher().search(query, maxResults);

// Sort based on document id, descending. This gives us most recent events first.
final Sort sort = new Sort(new SortField(null, SortField.Type.DOC, true));

topDocs = searcher.getIndexSearcher().search(query, maxResults, sort);
} catch (final Exception e) {
logger.error("Failed to query Lucene for index " + indexDir, e);
queryResult.setError("Failed to query Lucene for index " + indexDir + " due to " + e);
Expand Down Expand Up @@ -189,7 +195,7 @@ private Tuple<List<ProvenanceEventRecord>, Long> readDocuments(final TopDocs top

final long endConvert = System.nanoTime();
final long ms = TimeUnit.NANOSECONDS.toMillis(endConvert - start);
logger.debug("Converting documents took {} ms", ms);
logger.trace("Converting documents took {} ms", ms);

List<ProvenanceEventRecord> events;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ public PartitionedEventStore(final RepositoryConfiguration config, final EventRe
@Override
public void initialize() throws IOException {
maintenanceExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Provenance Repository Maintenance"));
maintenanceExecutor.scheduleWithFixedDelay(() -> performMaintenance(), 1, 1, TimeUnit.MINUTES);
final long maintenanceMillis = repoConfig.getMaintenanceFrequency(TimeUnit.MILLISECONDS);
maintenanceExecutor.scheduleWithFixedDelay(this::performMaintenance, maintenanceMillis, maintenanceMillis, TimeUnit.MILLISECONDS);

for (final EventStorePartition partition : getPartitions()) {
partition.initialize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public StorageResult addEvents(final Iterable<ProvenanceEventRecord> events) thr
}

// Claim a Record Writer Lease so that we have a writer to persist the events to
RecordWriterLease lease = null;
RecordWriterLease lease;
while (true) {
lease = getLease();
if (lease.tryClaim()) {
Expand All @@ -185,7 +185,6 @@ public StorageResult addEvents(final Iterable<ProvenanceEventRecord> events) thr
final RolloverState rolloverState = lease.getRolloverState();
if (rolloverState.isRollover()) {
final boolean success = tryRollover(lease);

if (success) {
logger.info("Successfully rolled over Event Writer for {} due to {}", this, rolloverState);
}
Expand Down Expand Up @@ -476,9 +475,16 @@ private Optional<File> getPathForEventId(final long id) {
public void purgeOldEvents(final long olderThan, final TimeUnit unit) {
final long timeCutoff = System.currentTimeMillis() - unit.toMillis(olderThan);

getEventFilesFromDisk().filter(file -> file.lastModified() < timeCutoff)
final List<File> removed = getEventFilesFromDisk().filter(file -> file.lastModified() < timeCutoff)
.sorted(DirectoryUtils.SMALLEST_ID_FIRST)
.forEach(this::delete);
.filter(this::delete)
.collect(Collectors.toList());

if (removed.isEmpty()) {
logger.debug("No Provenance Event files that exceed time-based threshold of {} {}", olderThan, unit);
} else {
logger.info("Purged {} Provenance Event files from Provenance Repository because the events were older than {} {}: {}", removed.size(), olderThan, unit, removed);
}
}

private File getActiveEventFile() {
Expand All @@ -489,20 +495,27 @@ private File getActiveEventFile() {
@Override
public long purgeOldestEvents() {
final List<File> eventFiles = getEventFilesFromDisk().sorted(DirectoryUtils.SMALLEST_ID_FIRST).collect(Collectors.toList());
if (eventFiles.isEmpty()) {
if (eventFiles.size() < 2) {
// If there are no Event Files, there's nothing to do. If there is exactly 1 Event File, it means that the only Event File
// that exists is the Active Event File, which we are writing to, so we don't want to remove it either.
return 0L;
}

final File currentFile = getActiveEventFile();
if (currentFile == null) {
logger.debug("There is currently no Active Event File for {}. Will not purge oldest events until the Active Event File has been established.", this);
return 0L;
}

for (final File eventFile : eventFiles) {
if (eventFile.equals(currentFile)) {
continue;
break;
}

final long fileSize = eventFile.length();

if (delete(eventFile)) {
logger.debug("{} Deleted {} event file ({}) due to storage limits", this, eventFile, FormatUtils.formatDataSize(fileSize));
logger.info("{} Deleted {} event file ({}) due to storage limits", this, eventFile, FormatUtils.formatDataSize(fileSize));
return fileSize;
} else {
logger.warn("{} Failed to delete oldest event file {}. This file should be cleaned up manually.", this, eventFile);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.tests.system;

import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;

public class TerminateFlowFile extends AbstractProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}

session.remove(flowFile);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
# limitations under the License.

org.apache.nifi.processors.tests.system.CountEvents
org.apache.nifi.processors.tests.system.FakeProcessor
org.apache.nifi.processors.tests.system.FakeDynamicPropertiesProcessor
org.apache.nifi.processors.tests.system.GenerateFlowFile
org.apache.nifi.processors.tests.system.Sleep
org.apache.nifi.processors.tests.system.TerminateFlowFile
org.apache.nifi.processors.tests.system.ValidateFileExists
org.apache.nifi.processors.tests.system.FakeProcessor
org.apache.nifi.processors.tests.system.FakeDynamicPropertiesProcessor
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,24 @@

import java.io.File;
import java.io.FileNotFoundException;
import java.util.HashMap;
import java.util.Map;

public class InstanceConfiguration {
private final File bootstrapConfigFile;
private final File instanceDirectory;
private final File flowXmlGz;
private final File stateDirectory;
private final boolean autoStart;
private final Map<String, String> nifiPropertiesOverrides;

private InstanceConfiguration(Builder builder) {
this.bootstrapConfigFile = builder.bootstrapConfigFile;
this.instanceDirectory = builder.instanceDirectory;
this.flowXmlGz = builder.flowXmlGz;
this.stateDirectory = builder.stateDirectory;
this.autoStart = builder.autoStart;
this.nifiPropertiesOverrides = builder.nifiPropertiesOverrides;
}

public File getBootstrapConfigFile() {
Expand All @@ -54,12 +58,26 @@ public boolean isAutoStart() {
return autoStart;
}

public Map<String, String> getNifiPropertiesOverrides() {
return nifiPropertiesOverrides;
}

public static class Builder {
private File bootstrapConfigFile;
private File instanceDirectory;
private File flowXmlGz;
private File stateDirectory;
private boolean autoStart = true;
private final Map<String, String> nifiPropertiesOverrides = new HashMap<>();

public Builder overrideNifiProperties(final Map<String, String> overrides) {
nifiPropertiesOverrides.clear();
if (overrides != null) {
nifiPropertiesOverrides.putAll(overrides);
}

return this;
}

public Builder bootstrapConfig(final File configFile) {
if (!configFile.exists()) {
Expand Down
Loading