Skip to content

Commit

Permalink
NIFI-12693: Moved notification of python process that a Processor was…
Browse files Browse the repository at this point in the history
… removed to a background (virtual) thread. Also noted in testing that in one instance a Python Processor never became

valid because it had cached property descriptors before the processor was fully initialized, so updated code to ensure that we do not cache values before initialization is completed.
  • Loading branch information
markap14 committed Jan 31, 2024
1 parent 4620afd commit e03ca50
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1263,6 +1263,14 @@ public void removeProcessor(final ProcessorNode processor) {
}
}

// Remove connections prior to removing the Processor. If there is any failure in removing the Processor or the associated cleanup,
// we can handle that. However, we could have many potential issues if Connections exist whose source or destination does not exist.
// must copy to avoid a concurrent modification
final List<Connection> copy = new ArrayList<>(processor.getConnections());
for (final Connection conn : copy) {
removeConnection(conn);
}

processors.remove(id);
onComponentModified();

Expand All @@ -1274,18 +1282,7 @@ public void removeProcessor(final ProcessorNode processor) {
logRepository.removeAllObservers();
}

scheduler.submitFrameworkTask(new Runnable() {
@Override
public void run() {
stateManagerProvider.onComponentRemoved(processor.getIdentifier());
}
});

// must copy to avoid a concurrent modification
final Set<Connection> copy = new HashSet<>(processor.getConnections());
for (final Connection conn : copy) {
removeConnection(conn);
}
scheduler.submitFrameworkTask(() -> stateManagerProvider.onComponentRemoved(processor.getIdentifier()));

removed = true;
LOG.info("{} removed from flow", processor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ public void deleteProcessor(String processorId) {
try {
// attempt remove the processor
processor.getProcessGroup().removeProcessor(processor);
} catch (ComponentLifeCycleException plce) {
} catch (final ComponentLifeCycleException plce) {
throw new NiFiCoreException(plce.getMessage(), plce);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Supplier;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -162,18 +162,29 @@ public synchronized void onProcessorRemoved(final String identifier, final Strin
return;
}

// Find the Python Process that has the Processor, if any, and remove it.
// If there are no additional Processors in the Python Process, remove it from our list and shut down the process.
final Iterator<PythonProcess> processItr = processes.iterator(); // Use iterator so we can call remove()
while (processItr.hasNext()) {
final PythonProcess process = processItr.next();
final boolean removed = process.removeProcessor(identifier);
if (removed && process.getProcessorCount() == 0) {
processItr.remove();
process.shutdown();
break;
Thread.ofVirtual().name("Remove Python Processor " + identifier).start(() -> {
PythonProcess toRemove = null;

try {
// Find the Python Process that has the Processor, if any, and remove it.
// If there are no additional Processors in the Python Process, remove it from our list and shut down the process.
// Use iterator so we can call remove()
for (final PythonProcess process : processes) {
final boolean removed = process.removeProcessor(identifier);
if (removed && process.getProcessorCount() == 0) {
toRemove = process;
break;
}
}

if (toRemove != null) {
processes.remove(toRemove);
toRemove.shutdown();
}
} catch (final Exception e) {
logger.error("Failed to trigger removal of Python Processor with ID {}", identifier, e);
}
}
});

processorCountByType.merge(extensionId, -1, Integer::sum);
} else {
Expand All @@ -198,7 +209,7 @@ private synchronized PythonProcess getProcessForNextComponent(final ExtensionId
// isolation (which is the case when Extension Manager creates a temp component), or if an existing process
// consists only of processors that don't prefer isolation. I.e., we don't want to collocate two Processors if
// they both prefer isolation.
final List<PythonProcess> processesForType = processesByProcessorType.computeIfAbsent(extensionId, key -> new ArrayList<>());
final List<PythonProcess> processesForType = processesByProcessorType.computeIfAbsent(extensionId, key -> new CopyOnWriteArrayList<>());
for (final PythonProcess pythonProcess : processesForType) {
if (!preferIsolatedProcess || !pythonProcess.containsIsolatedProcessor()) {
logger.debug("Using {} to create Processor of type {}", pythonProcess, extensionId.type());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return this.cachedPropertyDescriptors;
}

if (bridge == null) {
if (getState() != LoadState.FINISHED_LOADING) {
return Collections.emptyList();
}

Expand Down Expand Up @@ -202,7 +202,7 @@ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String
return cachedDynamicDescriptors.get(propertyDescriptorName);
}

if (bridge == null) {
if (getState() != LoadState.FINISHED_LOADING) {
return null;
}

Expand All @@ -221,7 +221,7 @@ protected boolean isSupportsDynamicPropertyDescriptor() {
return supportsDynamicProperties;
}

if (bridge == null) {
if (getState() != LoadState.FINISHED_LOADING) {
return false;
}

Expand Down Expand Up @@ -266,7 +266,7 @@ public Set<Relationship> getRelationships() {
}

private Set<Relationship> fetchRelationshipsFromPythonProcessor() {
if (bridge == null) {
if (getState() != LoadState.FINISHED_LOADING) {
return Collections.emptySet();
}

Expand Down

0 comments on commit e03ca50

Please sign in to comment.