Skip to content

Commit

Permalink
Merge branch 'master' into NIFI-6201
Browse files Browse the repository at this point in the history
  • Loading branch information
alopresto committed Apr 12, 2019
2 parents 0a16b9d + b5ff622 commit 1c9385f
Show file tree
Hide file tree
Showing 54 changed files with 3,069 additions and 265 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,82 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.controller.repository;

import java.util.Map;

import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.wali.SerDe;
import org.wali.SerDeFactory;
import org.wali.UpdateType;

public class RepositoryRecordSerdeFactory implements SerDeFactory<RepositoryRecord> {
private final String LEGACY_SERDE_ENCODING_NAME = "org.apache.nifi.controller.repository.WriteAheadFlowFileRepository$WriteAheadRecordSerde";
private final ResourceClaimManager resourceClaimManager;
private Map<String, FlowFileQueue> flowFileQueueMap = null;

public RepositoryRecordSerdeFactory(final ResourceClaimManager claimManager) {
this.resourceClaimManager = claimManager;
}

protected void setQueueMap(final Map<String, FlowFileQueue> queueMap) {
this.flowFileQueueMap = queueMap;
}

protected Map<String, FlowFileQueue> getQueueMap() {
return flowFileQueueMap;
}

@Override
public SerDe<RepositoryRecord> createSerDe(final String encodingName) {
if (encodingName == null || SchemaRepositoryRecordSerde.class.getName().equals(encodingName)) {
final SchemaRepositoryRecordSerde serde = new SchemaRepositoryRecordSerde(resourceClaimManager);
serde.setQueueMap(flowFileQueueMap);
return serde;
}

if (WriteAheadRepositoryRecordSerde.class.getName().equals(encodingName)
|| LEGACY_SERDE_ENCODING_NAME.equals(encodingName)) {
final WriteAheadRepositoryRecordSerde serde = new WriteAheadRepositoryRecordSerde(resourceClaimManager);
serde.setQueueMap(flowFileQueueMap);
return serde;
}

throw new IllegalArgumentException("Cannot create Deserializer for Repository Records because the encoding '" + encodingName + "' is not known");
}

protected FlowFileQueue getFlowFileQueue(final String queueId) {
return flowFileQueueMap.get(queueId);
}

@Override
public Long getRecordIdentifier(final RepositoryRecord record) {
return record.getCurrent().getId();
}

@Override
public UpdateType getUpdateType(final RepositoryRecord record) {
switch (record.getType()) {
case CONTENTMISSING:
case DELETE:
return UpdateType.DELETE;
case CREATE:
return UpdateType.CREATE;
case UPDATE:
return UpdateType.UPDATE;
case SWAP_OUT:
return UpdateType.SWAP_OUT;
case SWAP_IN:
return UpdateType.SWAP_IN;
}
return null;
}
import java.util.Map;

@Override
public String getLocation(final RepositoryRecord record) {
return record.getSwapLocation();
}
public interface RepositoryRecordSerdeFactory extends SerDeFactory<RepositoryRecord> {
void setQueueMap(Map<String, FlowFileQueue> queueMap);

Long getRecordIdentifier(RepositoryRecord record);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.controller.repository;

import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.wali.SerDe;
import org.wali.UpdateType;

import java.util.Map;

public class StandardRepositoryRecordSerdeFactory implements RepositoryRecordSerdeFactory {
private final String LEGACY_SERDE_ENCODING_NAME = "org.apache.nifi.controller.repository.WriteAheadFlowFileRepository$WriteAheadRecordSerde";
private final ResourceClaimManager resourceClaimManager;
private Map<String, FlowFileQueue> flowFileQueueMap = null;

public StandardRepositoryRecordSerdeFactory(final ResourceClaimManager claimManager) {
this.resourceClaimManager = claimManager;
}

@Override
public void setQueueMap(final Map<String, FlowFileQueue> queueMap) {
this.flowFileQueueMap = queueMap;
}

protected Map<String, FlowFileQueue> getQueueMap() {
return flowFileQueueMap;
}

@Override
public SerDe<RepositoryRecord> createSerDe(final String encodingName) {
if (encodingName == null || SchemaRepositoryRecordSerde.class.getName().equals(encodingName)) {
final SchemaRepositoryRecordSerde serde = new SchemaRepositoryRecordSerde(resourceClaimManager);
serde.setQueueMap(flowFileQueueMap);
return serde;
}

if (WriteAheadRepositoryRecordSerde.class.getName().equals(encodingName)
|| LEGACY_SERDE_ENCODING_NAME.equals(encodingName)) {
final WriteAheadRepositoryRecordSerde serde = new WriteAheadRepositoryRecordSerde(resourceClaimManager);
serde.setQueueMap(flowFileQueueMap);
return serde;
}

throw new IllegalArgumentException("Cannot create Deserializer for Repository Records because the encoding '" + encodingName + "' is not known");
}

protected FlowFileQueue getFlowFileQueue(final String queueId) {
return flowFileQueueMap.get(queueId);
}

@Override
public Long getRecordIdentifier(final RepositoryRecord record) {
return record.getCurrent().getId();
}

@Override
public UpdateType getUpdateType(final RepositoryRecord record) {
switch (record.getType()) {
case CONTENTMISSING:
case DELETE:
return UpdateType.DELETE;
case CREATE:
return UpdateType.CREATE;
case UPDATE:
return UpdateType.UPDATE;
case SWAP_OUT:
return UpdateType.SWAP_OUT;
case SWAP_IN:
return UpdateType.SWAP_IN;
}
return null;
}

@Override
public String getLocation(final RepositoryRecord record) {
return record.getSwapLocation();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ public abstract class AbstractComponentNode implements ComponentNode {
private final ControllerServiceProvider serviceProvider;
private final AtomicReference<String> name;
private final AtomicReference<String> annotationData = new AtomicReference<>();
private final AtomicReference<ValidationContext> validationContext = new AtomicReference<>();
private final String componentType;
private final String componentCanonicalClass;
private final ComponentVariableRegistry variableRegistry;
Expand All @@ -79,10 +78,13 @@ public abstract class AbstractComponentNode implements ComponentNode {
private final Lock lock = new ReentrantLock();
private final ConcurrentMap<PropertyDescriptor, String> properties = new ConcurrentHashMap<>();
private volatile String additionalResourcesFingerprint;
private AtomicReference<ValidationState> validationState = new AtomicReference<>(new ValidationState(ValidationStatus.VALIDATING, Collections.emptyList()));
private final AtomicReference<ValidationState> validationState = new AtomicReference<>(new ValidationState(ValidationStatus.VALIDATING, Collections.emptyList()));
private final ValidationTrigger validationTrigger;
private volatile boolean triggerValidation = true;

// guaraded by lock
private ValidationContext validationContext = null;

public AbstractComponentNode(final String id,
final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider,
final String componentType, final String componentCanonicalClass, final ComponentVariableRegistry variableRegistry,
Expand Down Expand Up @@ -575,7 +577,7 @@ public List<PropertyDescriptor> getPropertyDescriptors() {
}


private final void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
private void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(extensionManager, getComponent().getClass(), getComponent().getIdentifier())) {
getComponent().onPropertyModified(descriptor, oldValue, newValue);
}
Expand Down Expand Up @@ -627,13 +629,18 @@ private boolean replaceValidationState(final ValidationState expectedState, fina
}

protected void resetValidationState() {
validationContext.set(null);
validationState.set(new ValidationState(ValidationStatus.VALIDATING, Collections.emptyList()));
lock.lock();
try {
validationContext = null;
validationState.set(new ValidationState(ValidationStatus.VALIDATING, Collections.emptyList()));

if (isTriggerValidation()) {
validationTrigger.triggerAsync(this);
} else {
logger.debug("Reset validation state of {} but will not trigger async validation because trigger has been paused", this);
if (isTriggerValidation()) {
validationTrigger.triggerAsync(this);
} else {
logger.debug("Reset validation state of {} but will not trigger async validation because trigger has been paused", this);
}
} finally {
lock.unlock();
}
}

Expand Down Expand Up @@ -714,27 +721,21 @@ protected ValidationContextFactory getValidationContextFactory() {
}

protected ValidationContext getValidationContext() {
while (true) {
ValidationContext context = this.validationContext.get();
lock.lock();
try {
ValidationContext context = this.validationContext;
if (context != null) {
return context;
}

// Use a lock here because we want to prevent calls to getProperties() from happening while setProperties() is also happening.
final Map<PropertyDescriptor, String> properties;
lock.lock();
try {
properties = getProperties();
} finally {
lock.unlock();
}
final Map<PropertyDescriptor, String> properties = getProperties();
context = getValidationContextFactory().newValidationContext(properties, getAnnotationData(), getProcessGroupIdentifier(), getIdentifier());

final boolean updated = validationContext.compareAndSet(null, context);
if (updated) {
logger.debug("Updating validation context to {}", context);
return context;
}
this.validationContext = context;
logger.debug("Updating validation context to {}", context);
return context;
} finally {
lock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
*/
package org.apache.nifi.events;

import java.util.concurrent.atomic.AtomicLong;

import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.ComponentType;

import java.util.concurrent.atomic.AtomicLong;

public final class BulletinFactory {

private static final AtomicLong currentId = new AtomicLong(0);
Expand All @@ -47,7 +47,7 @@ public static Bulletin createBulletin(final Connectable connectable, final Strin
}

final ProcessGroup group = connectable.getProcessGroup();
final String groupId = group == null ? null : group.getIdentifier();
final String groupId = connectable.getProcessGroupIdentifier();
final String groupName = group == null ? null : group.getName();
return BulletinFactory.createBulletin(groupId, groupName, connectable.getIdentifier(), type, connectable.getName(), category, severity, message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,12 @@
<version>1.10.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-persistent-provenance-repository</artifactId>
<version>1.10.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.FlowFileSwapManager;
import org.apache.nifi.controller.repository.QueueProvider;
import org.apache.nifi.controller.repository.StandardCounterRepository;
import org.apache.nifi.controller.repository.StandardFlowFileRecord;
import org.apache.nifi.controller.repository.StandardQueueProvider;
Expand Down Expand Up @@ -294,6 +295,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
private final ProvenanceAuthorizableFactory provenanceAuthorizableFactory;
private final UserAwareEventAccess eventAccess;
private final StandardFlowManager flowManager;
private final RepositoryContextFactory repositoryContextFactory;

/**
* true if controller is configured to operate in a clustered environment
Expand Down Expand Up @@ -433,7 +435,7 @@ private FlowController(
final ExtensionManager extensionManager) {

maxTimerDrivenThreads = new AtomicInteger(10);
maxEventDrivenThreads = new AtomicInteger(5);
maxEventDrivenThreads = new AtomicInteger(1);

this.encryptor = encryptor;
this.nifiProperties = nifiProperties;
Expand Down Expand Up @@ -484,18 +486,17 @@ private FlowController(
processScheduler = new StandardProcessScheduler(timerDrivenEngineRef.get(), this, encryptor, stateManagerProvider, this.nifiProperties);
eventDrivenWorkerQueue = new EventDrivenWorkerQueue(false, false, processScheduler);

final RepositoryContextFactory contextFactory = new RepositoryContextFactory(contentRepository, flowFileRepository, flowFileEventRepository, counterRepositoryRef.get(), provenanceRepository);

repositoryContextFactory = new RepositoryContextFactory(contentRepository, flowFileRepository, flowFileEventRepository, counterRepositoryRef.get(), provenanceRepository);
this.flowManager = new StandardFlowManager(nifiProperties, sslContext, this, flowFileEventRepository);

controllerServiceProvider = new StandardControllerServiceProvider(this, processScheduler, bulletinRepository);

eventDrivenSchedulingAgent = new EventDrivenSchedulingAgent(
eventDrivenEngineRef.get(), controllerServiceProvider, stateManagerProvider, eventDrivenWorkerQueue, contextFactory, maxEventDrivenThreads.get(), encryptor, extensionManager);
eventDrivenEngineRef.get(), controllerServiceProvider, stateManagerProvider, eventDrivenWorkerQueue, repositoryContextFactory, maxEventDrivenThreads.get(), encryptor, extensionManager);
processScheduler.setSchedulingAgent(SchedulingStrategy.EVENT_DRIVEN, eventDrivenSchedulingAgent);

final QuartzSchedulingAgent quartzSchedulingAgent = new QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor);
final TimerDrivenSchedulingAgent timerDrivenAgent = new TimerDrivenSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor, this.nifiProperties);
final QuartzSchedulingAgent quartzSchedulingAgent = new QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), repositoryContextFactory, encryptor);
final TimerDrivenSchedulingAgent timerDrivenAgent = new TimerDrivenSchedulingAgent(this, timerDrivenEngineRef.get(), repositoryContextFactory, encryptor, this.nifiProperties);
processScheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, timerDrivenAgent);
// PRIMARY_NODE_ONLY is deprecated, but still exists to handle processors that are still defined with it (they haven't been re-configured with executeNode = PRIMARY).
processScheduler.setSchedulingAgent(SchedulingStrategy.PRIMARY_NODE_ONLY, timerDrivenAgent);
Expand Down Expand Up @@ -743,12 +744,16 @@ public void reportEvent(final Severity severity, final String category, final St
}

public void initializeFlow() throws IOException {
initializeFlow(new StandardQueueProvider(getFlowManager()));
}

public void initializeFlow(final QueueProvider queueProvider) throws IOException {
writeLock.lock();
try {
// get all connections/queues and recover from swap files.
final List<Connection> connections = flowManager.getRootGroup().findAllConnections();

flowFileRepository.loadFlowFiles(new StandardQueueProvider(this));
flowFileRepository.loadFlowFiles(queueProvider);

long maxIdFromSwapFiles = -1L;
if (flowFileRepository.isVolatile()) {
Expand Down Expand Up @@ -1190,6 +1195,7 @@ public void shutdown(final boolean kill) {

for (final RemoteSiteListener listener : externalSiteListeners) {
listener.stop();
listener.destroy();
}

if (loadBalanceServer != null) {
Expand Down Expand Up @@ -1719,6 +1725,10 @@ public FlowManager getFlowManager() {
return flowManager;
}

public RepositoryContextFactory getRepositoryContextFactory() {
return repositoryContextFactory;
}

/**
* Creates a connection between two Connectable objects.
*
Expand Down

0 comments on commit 1c9385f

Please sign in to comment.