Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -1606,7 +1608,7 @@ private FlowDTO postProcessNewFlowSnippet(final String groupId, final FlowSnippe

final ProcessGroup group = processGroupDAO.getProcessGroup(groupId);
final ProcessGroupStatus groupStatus = controllerFacade.getProcessGroupStatus(groupId);
return dtoFactory.createFlowDto(group, groupStatus, snippet, revisionManager);
return dtoFactory.createFlowDto(group, groupStatus, snippet, revisionManager, this::getProcessGroupBulletins);
}

@Override
Expand Down Expand Up @@ -2591,10 +2593,57 @@ private ProcessGroupEntity createProcessGroupEntity(final ProcessGroup group) {
final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(group.getIdentifier()));
final PermissionsDTO permissions = dtoFactory.createPermissionsDto(group);
final ProcessGroupStatusDTO status = dtoFactory.createConciseProcessGroupStatusDto(controllerFacade.getProcessGroupStatus(group.getIdentifier()));
final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(group.getIdentifier()));
final List<BulletinDTO> bulletins = getProcessGroupBulletins(group);
return entityFactory.createProcessGroupEntity(dtoFactory.createProcessGroupDto(group), revision, permissions, status, bulletins);
}

private List<BulletinDTO> getProcessGroupBulletins(final ProcessGroup group) {
final List<Bulletin> bulletins = new ArrayList<>(bulletinRepository.findBulletinsForGroupBySource(group.getIdentifier()));

for (final ProcessGroup descendantGroup : group.findAllProcessGroups()) {
bulletins.addAll(bulletinRepository.findBulletinsForGroupBySource(descendantGroup.getIdentifier()));
}

List<BulletinDTO> dtos = new ArrayList<>();
for (final Bulletin bulletin : bulletins) {
if (authorizeBulletin(bulletin)) {
dtos.add(dtoFactory.createBulletinDto(bulletin));
} else {
final BulletinDTO bulletinDTO = new BulletinDTO();
bulletinDTO.setTimestamp(bulletin.getTimestamp());
bulletinDTO.setId(bulletin.getId());
bulletinDTO.setSourceId(bulletin.getSourceId());
bulletinDTO.setGroupId(bulletin.getGroupId());
dtos.add(bulletinDTO);
}
}

// sort the bulletins
Collections.sort(dtos, new Comparator<BulletinDTO>() {
@Override
public int compare(BulletinDTO o1, BulletinDTO o2) {
if (o1 == null && o2 == null) {
return 0;
}
if (o1 == null) {
return 1;
}
if (o2 == null) {
return -1;
}

return -Long.compare(o1.getId(), o2.getId());
}
});

// prune the response to only include the max number of bulletins
if (dtos.size() > BulletinRepository.MAX_BULLETINS_PER_COMPONENT) {
dtos = dtos.subList(0, BulletinRepository.MAX_BULLETINS_PER_COMPONENT);
}

return dtos;
}

@Override
public Set<ProcessGroupEntity> getProcessGroups(final String parentGroupId) {
final Set<ProcessGroup> groups = processGroupDAO.getProcessGroups(parentGroupId);
Expand Down Expand Up @@ -2706,7 +2755,7 @@ public ProcessGroupFlowEntity getProcessGroupFlow(final String groupId) {
// read lock on every component being accessed in the dto conversion
final ProcessGroupStatus groupStatus = controllerFacade.getProcessGroupStatus(groupId);
final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup);
return entityFactory.createProcessGroupFlowEntity(dtoFactory.createProcessGroupFlowDto(processGroup, groupStatus, revisionManager), permissions);
return entityFactory.createProcessGroupFlowEntity(dtoFactory.createProcessGroupFlowDto(processGroup, groupStatus, revisionManager, this::getProcessGroupBulletins), permissions);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1603,12 +1603,14 @@ public ProcessGroupDTO createProcessGroupDto(final ProcessGroup group) {
return createProcessGroupDto(group, false);
}

public ProcessGroupFlowDTO createProcessGroupFlowDto(final ProcessGroup group, final ProcessGroupStatus groupStatus, final RevisionManager revisionManager) {
public ProcessGroupFlowDTO createProcessGroupFlowDto(final ProcessGroup group, final ProcessGroupStatus groupStatus, final RevisionManager revisionManager,
final Function<ProcessGroup, List<BulletinDTO>> getProcessGroupBulletins) {

final ProcessGroupFlowDTO dto = new ProcessGroupFlowDTO();
dto.setId(group.getIdentifier());
dto.setLastRefreshed(new Date());
dto.setBreadcrumb(createBreadcrumbEntity(group));
dto.setFlow(createFlowDto(group, groupStatus, revisionManager));
dto.setFlow(createFlowDto(group, groupStatus, revisionManager, getProcessGroupBulletins));

final ProcessGroup parent = group.getParent();
if (parent != null) {
Expand All @@ -1618,7 +1620,8 @@ public ProcessGroupFlowDTO createProcessGroupFlowDto(final ProcessGroup group, f
return dto;
}

public FlowDTO createFlowDto(final ProcessGroup group, final ProcessGroupStatus groupStatus, final FlowSnippetDTO snippet, final RevisionManager revisionManager) {
public FlowDTO createFlowDto(final ProcessGroup group, final ProcessGroupStatus groupStatus, final FlowSnippetDTO snippet, final RevisionManager revisionManager,
final Function<ProcessGroup, List<BulletinDTO>> getProcessGroupBulletins) {
if (snippet == null) {
return null;
}
Expand Down Expand Up @@ -1700,7 +1703,7 @@ public FlowDTO createFlowDto(final ProcessGroup group, final ProcessGroupStatus
() -> groupStatus.getProcessGroupStatus().stream().filter(processGroupStatus -> processGroup.getIdentifier().equals(processGroupStatus.getId())).findFirst().orElse(null),
processGroupStatus -> createConciseProcessGroupStatusDto(processGroupStatus)
);
final List<BulletinDTO> bulletins = createBulletinDtos(bulletinRepository.findBulletinsForSource(processGroup.getIdentifier()));
final List<BulletinDTO> bulletins = getProcessGroupBulletins.apply(processGroup);
flow.getProcessGroups().add(entityFactory.createProcessGroupEntity(dto, revision, accessPolicy, status, bulletins));
}

Expand Down Expand Up @@ -1748,7 +1751,8 @@ private <T, S> T getComponentStatus(final Supplier<S> getComponentStatus, final
return statusDTO;
}

public FlowDTO createFlowDto(final ProcessGroup group, final ProcessGroupStatus groupStatus, final RevisionManager revisionManager) {
public FlowDTO createFlowDto(final ProcessGroup group, final ProcessGroupStatus groupStatus, final RevisionManager revisionManager,
final Function<ProcessGroup, List<BulletinDTO>> getProcessGroupBulletins) {
final FlowDTO dto = new FlowDTO();

for (final ProcessorNode procNode : group.getProcessors()) {
Expand Down Expand Up @@ -1791,7 +1795,7 @@ public FlowDTO createFlowDto(final ProcessGroup group, final ProcessGroupStatus
() -> groupStatus.getProcessGroupStatus().stream().filter(processGroupStatus -> childGroup.getIdentifier().equals(processGroupStatus.getId())).findFirst().orElse(null),
processGroupStatus -> createConciseProcessGroupStatusDto(processGroupStatus)
);
final List<BulletinDTO> bulletins = createBulletinDtos(bulletinRepository.findBulletinsForSource(childGroup.getIdentifier()));
final List<BulletinDTO> bulletins = getProcessGroupBulletins.apply(childGroup);
dto.getProcessGroups().add(entityFactory.createProcessGroupEntity(createProcessGroupDto(childGroup), revision, permissions, status, bulletins));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,9 @@ public ProcessGroupEntity createProcessGroupEntity(final ProcessGroupDTO dto, fi
entity.setDisabledCount(dto.getDisabledCount());
entity.setActiveRemotePortCount(dto.getActiveRemotePortCount());
entity.setInactiveRemotePortCount(dto.getInactiveRemotePortCount());
entity.setBulletins(bulletins); // include bulletins as authorized descendant component bulletins should be available
if (permissions != null && permissions.getCanRead()) {
entity.setComponent(dto);
entity.setBulletins(bulletins);
}
}
return entity;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.nifi.authorization.resource.ResourceFactory;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.connectable.Connectable;
Expand Down Expand Up @@ -79,7 +78,6 @@
import org.apache.nifi.provenance.search.SearchableField;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.remote.RootGroupPort;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.search.SearchContext;
Expand Down Expand Up @@ -140,8 +138,6 @@ public class ControllerFacade implements Authorizable {
// nifi components
private FlowController flowController;
private FlowService flowService;
private ClusterCoordinator clusterCoordinator;
private BulletinRepository bulletinRepository;
private Authorizer authorizer;

// properties
Expand Down Expand Up @@ -1808,14 +1804,6 @@ public void setDtoFactory(DtoFactory dtoFactory) {
this.dtoFactory = dtoFactory;
}

public void setClusterCoordinator(ClusterCoordinator clusterCoordinator) {
this.clusterCoordinator = clusterCoordinator;
}

public void setBulletinRepository(BulletinRepository bulletinRepository) {
this.bulletinRepository = bulletinRepository;
}

public void setVariableRegistry(VariableRegistry variableRegistry) {
this.variableRegistry = variableRegistry;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,8 @@
<property name="properties" ref="nifiProperties"/>
<property name="flowController" ref="flowController"/>
<property name="flowService" ref="flowService"/>
<property name="clusterCoordinator" ref="clusterCoordinator" />
<property name="authorizer" ref="authorizer"/>
<property name="dtoFactory" ref="dtoFactory"/>
<property name="bulletinRepository" ref="bulletinRepository"/>
<property name="variableRegistry" ref="variableRegistry"/>
</bean>
<bean id="authorizableLookup" class="org.apache.nifi.authorization.StandardAuthorizableLookup">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,8 +482,20 @@ nf.CanvasUtils = (function () {
tip.remove();
}

// if there are bulletins show them, otherwise hide
var hasBulletins = false;
if (!nf.Common.isEmpty(d.bulletins)) {
// format the bulletins
var bulletins = nf.Common.getFormattedBulletins(d.bulletins);
hasBulletins = bulletins.length > 0;

if (hasBulletins) {
// create the unordered list based off the formatted bulletins
var list = nf.Common.formatUnorderedList(bulletins);
}
}

// if there are bulletins show them, otherwise hide
if (hasBulletins) {
// update the tooltip
selection.select('text.bulletin-icon')
.each(function () {
Expand All @@ -494,16 +506,7 @@ nf.CanvasUtils = (function () {
})
.attr('class', 'tooltip nifi-tooltip')
.html(function () {
// format the bulletins
var bulletins = nf.Common.getFormattedBulletins(d.bulletins);

// create the unordered list based off the formatted bulletins
var list = nf.Common.formatUnorderedList(bulletins);
if (list === null || list.length === 0) {
return '';
} else {
return $('<div></div>').append(list).html();
}
return $('<div></div>').append(list).html();
});

// add the tooltip
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1217,25 +1217,27 @@ nf.Common = (function () {
getFormattedBulletins: function (bulletins) {
var formattedBulletins = [];
$.each(bulletins, function (j, bulletin) {
// format the node address
var nodeAddress = '';
if (nf.Common.isDefinedAndNotNull(bulletin.nodeAddress)) {
nodeAddress = '-&nbsp' + nf.Common.escapeHtml(bulletin.nodeAddress) + '&nbsp;-&nbsp;';
}
if (!nf.Common.isBlank(bulletin.level)) {
// format the node address
var nodeAddress = '';
if (nf.Common.isDefinedAndNotNull(bulletin.nodeAddress)) {
nodeAddress = '-&nbsp' + nf.Common.escapeHtml(bulletin.nodeAddress) + '&nbsp;-&nbsp;';
}

// set the bulletin message (treat as text)
var bulletinMessage = $('<pre></pre>').css({
'white-space': 'pre-wrap'
}).text(bulletin.message);
// set the bulletin message (treat as text)
var bulletinMessage = $('<pre></pre>').css({
'white-space': 'pre-wrap'
}).text(bulletin.message);

// create the bulletin message
var formattedBulletin = $('<div>' +
nf.Common.escapeHtml(bulletin.timestamp) + '&nbsp;' +
nodeAddress + '&nbsp;' +
'<b>' + nf.Common.escapeHtml(bulletin.level) + '</b>&nbsp;' +
'</div>').append(bulletinMessage);
// create the bulletin message
var formattedBulletin = $('<div>' +
nf.Common.escapeHtml(bulletin.timestamp) + '&nbsp;' +
nodeAddress + '&nbsp;' +
'<b>' + nf.Common.escapeHtml(bulletin.level) + '</b>&nbsp;' +
'</div>').append(bulletinMessage);

formattedBulletins.push(formattedBulletin);
formattedBulletins.push(formattedBulletin);
}
});
return formattedBulletins;
}
Expand Down