From 50b007e9c56575dd163af512f2ddd0a7be04f8ce Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 28 Jul 2016 10:19:45 -0400 Subject: [PATCH] NIFI-2395: Ensure that if we fail to index provenance events we do not prevent the repo from continuing to merge journals --- .../PersistentProvenanceRepository.java | 64 +++++++++++++++---- .../TestPersistentProvenanceRepository.java | 54 ++++++++++++++++ 2 files changed, 106 insertions(+), 12 deletions(-) diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java index 7e8b9a63614f..aee8277482d3 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java @@ -123,6 +123,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { public static final Pattern INDEX_PATTERN = Pattern.compile("index-\\d+"); public static final Pattern LOG_FILENAME_PATTERN = Pattern.compile("(\\d+).*\\.prov"); public static final int MAX_UNDELETED_QUERY_RESULTS = 10; + public static final int MAX_INDEXING_FAILURE_COUNT = 5; // how many indexing failures we will tolerate before skipping indexing for a prov file private static final Logger logger = LoggerFactory.getLogger(PersistentProvenanceRepository.class); @@ -1648,7 +1649,7 @@ record = reader.nextRecord(); try (final RecordWriter writer = RecordWriters.newRecordWriter(writerFile, configuration.isCompressOnRollover(), true)) { writer.writeHeader(minEventId); - final IndexingAction indexingAction = new IndexingAction(this); + final IndexingAction indexingAction = createIndexingAction(); final File indexingDirectory = indexConfig.getWritableIndexDirectory(writerFile, earliestTimestamp); long maxId = 0L; @@ -1668,24 +1669,33 @@ public Thread newThread(final Runnable r) { } }); + final AtomicInteger indexingFailureCount = new AtomicInteger(0); try { for (int i = 0; i < configuration.getIndexThreadPoolSize(); i++) { final Callable callable = new Callable() { @Override public Object call() throws IOException { while (!eventQueue.isEmpty() || !finishedAdding.get()) { - final Tuple tuple; try { - tuple = eventQueue.poll(10, TimeUnit.MILLISECONDS); - } catch (final InterruptedException ie) { - continue; + final Tuple tuple; + try { + tuple = eventQueue.poll(10, TimeUnit.MILLISECONDS); + } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); + continue; + } + + if (tuple == null) { + continue; + } + + indexingAction.index(tuple.getKey(), indexWriter, tuple.getValue()); + } catch (final Throwable t) { + logger.error("Failed to index Provenance Event for " + writerFile + " to " + indexingDirectory, t); + if (indexingFailureCount.incrementAndGet() >= MAX_INDEXING_FAILURE_COUNT) { + return null; + } } - - if (tuple == null) { - continue; - } - - indexingAction.index(tuple.getKey(), indexWriter, tuple.getValue()); } return null; @@ -1696,6 +1706,7 @@ public Object call() throws IOException { futures.add(future); } + boolean indexEvents = true; while (!recordToReaderMap.isEmpty()) { final Map.Entry entry = recordToReaderMap.entrySet().iterator().next(); final StandardProvenanceEventRecord record = entry.getKey(); @@ -1705,12 +1716,30 @@ public Object call() throws IOException { final int blockIndex = writer.getTocWriter().getCurrentBlockIndex(); boolean accepted = false; - while (!accepted) { + while (!accepted && indexEvents) { try { accepted = eventQueue.offer(new Tuple<>(record, blockIndex), 10, TimeUnit.MILLISECONDS); } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); + } + + // If we weren't able to add anything to the queue, check if we have reached our max failure count. + // We do this here because if we do reach our max failure count, all of the indexing threads will stop + // performing their jobs. As a result, the queue will fill and we won't be able to add anything to it. + // So, if the queue is filled, we will check if this is the case. + if (!accepted && indexingFailureCount.get() >= MAX_INDEXING_FAILURE_COUNT) { + indexEvents = false; // don't add anything else to the queue. + eventQueue.clear(); + + final String warning = String.format("Indexing Provenance Events for %s has failed %s times. This exceeds the maximum threshold of %s failures, " + + "so no more Provenance Events will be indexed for this Provenance file.", writerFile, indexingFailureCount.get(), MAX_INDEXING_FAILURE_COUNT); + logger.warn(warning); + if (eventReporter != null) { + eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, warning); + } } } + maxId = record.getEventId(); latestRecords.add(truncateAttributes(record)); @@ -1747,6 +1776,7 @@ public Object call() throws IOException { throw new RuntimeException(t); } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); throw new RuntimeException("Thread interrupted"); } } @@ -1810,6 +1840,15 @@ public boolean evaluate(final ProvenanceEventRecord event) { return writerFile; } + /** + * This method is protected and exists for testing purposes. This allows unit tests to extend this class and + * override the createIndexingAction so that they can mock out the Indexing Action to throw Exceptions, count + * events indexed, etc. + */ + protected IndexingAction createIndexingAction() { + return new IndexingAction(this); + } + private StandardProvenanceEventRecord truncateAttributes(final StandardProvenanceEventRecord original) { boolean requireTruncation = false; @@ -2264,6 +2303,7 @@ public QuerySubmission retrieveQuerySubmission(final String queryIdentifier, fin throw new AccessDeniedException("Cannot retrieve Provenance Query Submission because " + user.getIdentity() + " is not the user who submitted the request"); } + @Override public ProvenanceEventRecord getEvent(final long id) throws IOException { final List records = getEvents(id, 1); if (records.isEmpty()) { diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java index c4fe6ed9d346..d7738e7f0b0a 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java @@ -20,6 +20,7 @@ import org.apache.lucene.analysis.core.SimpleAnalyzer; import org.apache.lucene.document.Document; import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexWriter; import org.apache.lucene.queryparser.classic.ParseException; import org.apache.lucene.queryparser.classic.QueryParser; import org.apache.lucene.search.IndexSearcher; @@ -35,6 +36,7 @@ import org.apache.nifi.provenance.lineage.LineageEdge; import org.apache.nifi.provenance.lineage.LineageNode; import org.apache.nifi.provenance.lineage.LineageNodeType; +import org.apache.nifi.provenance.lucene.IndexingAction; import org.apache.nifi.provenance.search.Query; import org.apache.nifi.provenance.search.QueryResult; import org.apache.nifi.provenance.search.QuerySubmission; @@ -1536,6 +1538,58 @@ public void testTruncateAttributes() throws IOException, InterruptedException { } + @Test(timeout=5000) + public void testExceptionOnIndex() throws IOException { + final RepositoryConfiguration config = createConfiguration(); + config.setMaxAttributeChars(50); + config.setMaxEventFileLife(3, TimeUnit.SECONDS); + config.setIndexThreadPoolSize(1); + + final int numEventsToIndex = 10; + + final AtomicInteger indexedEventCount = new AtomicInteger(0); + repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) { + @Override + protected synchronized IndexingAction createIndexingAction() { + return new IndexingAction(repo) { + @Override + public void index(StandardProvenanceEventRecord record, IndexWriter indexWriter, Integer blockIndex) throws IOException { + final int count = indexedEventCount.incrementAndGet(); + if (count <= numEventsToIndex) { + return; + } + + throw new IOException("Unit Test - Intentional Exception"); + } + }; + } + }; + repo.initialize(getEventReporter(), null, null); + + final Map attributes = new HashMap<>(); + attributes.put("uuid", "12345678-0000-0000-0000-012345678912"); + + final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); + builder.setEventTime(System.currentTimeMillis()); + builder.setEventType(ProvenanceEventType.RECEIVE); + builder.setTransitUri("nifi://unit-test"); + builder.fromFlowFile(createFlowFile(3L, 3000L, attributes)); + builder.setComponentId("1234"); + builder.setComponentType("dummy processor"); + + for (int i=0; i < 1000; i++) { + final ProvenanceEventRecord record = builder.build(); + repo.registerEvent(record); + } + + repo.waitForRollover(); + + assertEquals(numEventsToIndex + PersistentProvenanceRepository.MAX_INDEXING_FAILURE_COUNT, indexedEventCount.get()); + assertEquals(1, reportedEvents.size()); + final ReportedEvent event = reportedEvents.get(0); + assertEquals(Severity.WARNING, event.getSeverity()); + } + @Test public void testFailureToCreateWriterDoesNotPreventSubsequentRollover() throws IOException, InterruptedException { final RepositoryConfiguration config = createConfiguration();