Skip to content

Commit

Permalink
Merge branch 'release-25.2.1' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
christianpape committed Feb 12, 2020
2 parents 7ec3f9e + 863492b commit 7af8b1c
Show file tree
Hide file tree
Showing 26 changed files with 474 additions and 75 deletions.
28 changes: 22 additions & 6 deletions .circleci/scripts/publish-cloudsmith.sh
Expand Up @@ -29,15 +29,31 @@ esac


find target -type f | sort -u find target -type f | sort -u


publishPackage() {
local _tmpdir;
_tmpdir="$(mktemp -d 2>/dev/null || mktemp -d -t 'publish_cloudsmith_')"
echo "$@"
"$@" >"${_tmpdir}/publish.log" 2>&1
ret="$?"
cat "${_tmpdir}/publish.log"
if [ "$(grep -c "This package duplicates the attributes of another package" < "${_tmpdir}/publish.log")" -gt 0 ]; then
echo "Duplicate upload... skipping."
return 0
fi
rm "${_tmpdir}/publish.log"
rmdir "${_tmpdir}" || :
return "$ret"
}

for FILE in target/rpm/RPMS/noarch/*.rpm; do for FILE in target/rpm/RPMS/noarch/*.rpm; do
# give it 3 tries then die # give it 3 tries then die
cloudsmith push rpm "${PROJECT}/$REPO/any-distro/any-version" "$FILE" || publishPackage cloudsmith push rpm --no-wait-for-sync "${PROJECT}/$REPO/any-distro/any-version" "$FILE" ||
cloudsmith push rpm "${PROJECT}/$REPO/any-distro/any-version" "$FILE" || publishPackage cloudsmith push rpm --no-wait-for-sync "${PROJECT}/$REPO/any-distro/any-version" "$FILE" ||
cloudsmith push rpm "${PROJECT}/$REPO/any-distro/any-version" "$FILE" || exit 1 publishPackage cloudsmith push rpm --no-wait-for-sync "${PROJECT}/$REPO/any-distro/any-version" "$FILE" || exit 1
done done
for FILE in target/debs/*.deb; do for FILE in target/debs/*.deb; do
# give it 3 tries then die # give it 3 tries then die
cloudsmith push deb "${PROJECT}/$REPO/any-distro/any-version" "$FILE" || publishPackage cloudsmith push deb --no-wait-for-sync "${PROJECT}/$REPO/any-distro/any-version" "$FILE" ||
cloudsmith push deb "${PROJECT}/$REPO/any-distro/any-version" "$FILE" || publishPackage cloudsmith push deb --no-wait-for-sync "${PROJECT}/$REPO/any-distro/any-version" "$FILE" ||
cloudsmith push deb "${PROJECT}/$REPO/any-distro/any-version" "$FILE" || exit 1 publishPackage cloudsmith push deb --no-wait-for-sync "${PROJECT}/$REPO/any-distro/any-version" "$FILE" || exit 1
done done
3 changes: 3 additions & 0 deletions container/features/src/main/resources/features-sentinel.xml
Expand Up @@ -67,6 +67,9 @@
<!-- Core Sentinel --> <!-- Core Sentinel -->
<bundle>mvn:org.opennms.features.distributed/shell/${project.version}</bundle> <bundle>mvn:org.opennms.features.distributed/shell/${project.version}</bundle>
<bundle>mvn:org.opennms.features.sentinel/core/${project.version}</bundle> <bundle>mvn:org.opennms.features.sentinel/core/${project.version}</bundle>

<!-- SystemProperties -->
<bundle>mvn:org.opennms.core/org.opennms.core.sysprops/${project.version}</bundle>
</feature> </feature>


<feature name="sentinel-events-forwarder" description="OpenNMS :: Sentinel :: Events Forwarder" version="${project.version}"> <feature name="sentinel-events-forwarder" description="OpenNMS :: Sentinel :: Events Forwarder" version="${project.version}">
Expand Down
143 changes: 143 additions & 0 deletions core/schema/src/main/liquibase/25.2.1/changelog.xml
@@ -0,0 +1,143 @@
<?xml version="1.0" encoding="UTF-8"?>
<databaseChangeLog
xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:ext="http://www.liquibase.org/xml/ns/dbchangelog-ext"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-2.0.xsd" >

<changeSet author="cpape" id="25.2.1-has-flows-snmpinterface">
<preConditions onFail="MARK_RAN">
<columnExists tableName="snmpinterface" columnName="hasflows" />
</preConditions>

<dropNotNullConstraint tableName="snmpinterface" columnName="hasflows" />
<dropColumn tableName="snmpinterface" columnName="hasflows" />

<addColumn tableName="snmpinterface">
<column name="last_ingress_flow" type="TIMESTAMP WITH TIME ZONE" />
<column name="last_egress_flow" type="TIMESTAMP WITH TIME ZONE" />
</addColumn>
</changeSet>

<changeSet author="cpape" id="25.2.1-has-flows-node">
<preConditions onFail="MARK_RAN">
<columnExists tableName="node" columnName="hasflows" />
</preConditions>

<!-- we have to drop these views since the node table will be changed -->
<dropView viewName="node_outages"/>
<dropView viewName="node_ip_services"/>
<dropView viewName="node_categories"/>

<dropNotNullConstraint tableName="node" columnName="hasflows" />
<dropColumn tableName="node" columnName="hasflows" />
<addColumn tableName="node">
<column name="last_ingress_flow" type="TIMESTAMP WITH TIME ZONE" />
<column name="last_egress_flow" type="TIMESTAMP WITH TIME ZONE" />
</addColumn>

<!-- we have to recreate the views since the node table has been changed -->
<sql>
CREATE VIEW node_categories AS (
SELECT
n.*,
COALESCE(s_cat.categoryname, 'no category') AS categoryname
FROM
node n
LEFT JOIN
category_node cn
ON
n.nodeid = cn.nodeid
LEFT JOIN
categories s_cat
ON
cn.categoryid = s_cat.categoryid
);
</sql>

<sql>
CREATE VIEW node_outages AS (
SELECT
outages.outageid,
outages.svclosteventid,
outages.svcregainedeventid,
outages.iflostservice,
outages.ifregainedservice,
outages.ifserviceid,
e.eventuei AS svclosteventuei,
e.eventsource,
e.alarmid,
e.eventseverity,
(ifregainedservice NOTNULL) AS resolved,
s.servicename,
i.serviceid,
ipif.ipaddr,
COALESCE(outages.ifregainedservice - outages.iflostservice, now() - outages.iflostservice) AS duration,
nos.max_outage_severity,
nc.*
FROM
outages
JOIN
events e
ON
outages.svclosteventid = e.eventid
JOIN
ifservices i
ON
outages.ifserviceid = i.id
JOIN
service s
ON
i.serviceid = s.serviceid
JOIN
ipinterface ipif
ON
i.ipinterfaceid = ipif.id
JOIN
node_categories nc
ON
nc.nodeid = e.nodeid
JOIN
node_outage_status nos
ON
nc.nodeid = nos.nodeid
);
</sql>

<sql>
CREATE VIEW node_ip_services AS (
SELECT
n.*,
ip_if.id AS ip_if_id,
ip_if.ipaddr,
ip_if.iphostname,
ip_if.ismanaged,
ip_if.ipstatus,
ip_if.iplastcapsdpoll,
ip_if.issnmpprimary,
ip_if.snmpinterfaceid,
ip_if.netmask,
svc.serviceid,
svc.servicename,
if_svc.id AS if_svc_id,
if_svc.ifindex AS if_svc_ifindex,
if_svc.status AS if_svc_status
FROM
node_categories n
LEFT JOIN
ipinterface ip_if
ON
ip_if.nodeid = n.nodeid
LEFT JOIN
ifservices if_svc
ON
ip_if.id = if_svc.ipinterfaceid
LEFT JOIN
service svc
ON
if_svc.serviceid = svc.serviceid
);
</sql>
</changeSet>

</databaseChangeLog>
1 change: 1 addition & 0 deletions core/schema/src/main/liquibase/changelog.xml
Expand Up @@ -89,6 +89,7 @@
<include file="24.0.0/changelog.xml"/> <include file="24.0.0/changelog.xml"/>
<include file="24.1.0/changelog.xml"/> <include file="24.1.0/changelog.xml"/>
<include file="25.0.0/changelog.xml"/> <include file="25.0.0/changelog.xml"/>
<include file="25.2.1/changelog.xml"/>
<include file="26.0.0/changelog.xml"/> <include file="26.0.0/changelog.xml"/>


<include file="stored-procedures/getManagePercentAvailIntfWindow.xml" /> <include file="stored-procedures/getManagePercentAvailIntfWindow.xml" />
Expand Down
Expand Up @@ -164,7 +164,7 @@ angular.module('onms-interfaces', [


$scope.updateFlowUrlsForSnmpInterfaces = function() { $scope.updateFlowUrlsForSnmpInterfaces = function() {
angular.forEach($scope.snmpInterfaces, function(intf) { angular.forEach($scope.snmpInterfaces, function(intf) {
if (!intf.hasFlows) { if (!intf.hasIngressFlows && !intf.hasEgressFlows) {
// No flows - nothing to do // No flows - nothing to do
return; return;
} }
Expand Down
Expand Up @@ -81,7 +81,11 @@
<td>{{ intf.ifName || 'N/A' }}</td> <td>{{ intf.ifName || 'N/A' }}</td>
<td>{{ intf.ifAlias || 'N/A' }}</td> <td>{{ intf.ifAlias || 'N/A' }}</td>
<td>{{ intf.ifSpeed || 'N/A' }}</td> <td>{{ intf.ifSpeed || 'N/A' }}</td>
<td><a ng-if="intf.hasFlows" ng-href="{{intf.flowGraphUrl}}" target="_blank" title="Open flow graphs"><span class="badge badge-secondary" title="Flows: flow data available"><i class="fa fa-exchange"></i></span></a></td> <td>
<a ng-if="intf.hasIngressFlows && intf.hasEgressFlows" ng-href="{{intf.flowGraphUrl}}" target="_blank" title="Open flow graphs"><span class="badge badge-secondary" title="Flows: ingress/egress flow data available"><i class="fa fa-exchange"></i></span></a>
<a ng-if="intf.hasIngressFlows && !intf.hasEgressFlows" ng-href="{{intf.flowGraphUrl}}" target="_blank" title="Open flow graphs"><span class="badge badge-secondary" title="Flows: ingress flow data available"><i class="fa fa-long-arrow-left"></i></span></a>
<a ng-if="!intf.hasIngressFlows && intf.hasEgressFlows" ng-href="{{intf.flowGraphUrl}}" target="_blank" title="Open flow graphs"><span class="badge badge-secondary" title="Flows: egress flow data available"><i class="fa fa-long-arrow-right"></i></span></a>
</td>
</tr> </tr>
</tbody> </tbody>
</table> </table>
Expand Down
Expand Up @@ -114,7 +114,9 @@ angular.module('onms-resources', [
typeLabel: obj.typeLabel, typeLabel: obj.typeLabel,
checked: false, checked: false,
ifIndex: parseInt(obj.externalValueAttributes.ifIndex, 10), // will return NaN if not set ifIndex: parseInt(obj.externalValueAttributes.ifIndex, 10), // will return NaN if not set
hasFlows: typeof obj.externalValueAttributes.hasFlows === 'undefined' ? false : JSON.parse(obj.externalValueAttributes.hasFlows) hasFlows: typeof obj.externalValueAttributes.hasFlows === 'undefined' ? false : JSON.parse(obj.externalValueAttributes.hasFlows),
hasIngressFlows: typeof obj.externalValueAttributes.hasIngressFlows === 'undefined' ? false : JSON.parse(obj.externalValueAttributes.hasIngressFlows),
hasEgressFlows: typeof obj.externalValueAttributes.hasEgressFlows === 'undefined' ? false : JSON.parse(obj.externalValueAttributes.hasEgressFlows)
}; };
$scope.updateFlowUrlForResource(nodeCriteria, resource); $scope.updateFlowUrlForResource(nodeCriteria, resource);
return resource; return resource;
Expand All @@ -138,7 +140,7 @@ angular.module('onms-resources', [
}; };


$scope.updateFlowUrlForResource = function(nodeCriteria, resource) { $scope.updateFlowUrlForResource = function(nodeCriteria, resource) {
if (!resource.hasFlows || isNaN(resource.ifIndex)) { if ((!resource.hasIngressFlows && !resource.hasEgressFlows) || isNaN(resource.ifIndex)) {
// No flows, or not an interface, nothing to do // No flows, or not an interface, nothing to do
return; return;
} }
Expand Down
1 change: 1 addition & 0 deletions features/distributed/dao/impl/pom.xml
Expand Up @@ -84,6 +84,7 @@
org.opennms.core.soa, org.opennms.core.soa,
org.opennms.core.collections, org.opennms.core.collections,
org.opennms.core.spring, org.opennms.core.spring,
org.opennms.core.sysprops,


org.opennms.features.reporting.model, org.opennms.features.reporting.model,
org.opennms.netmgt.collection.api, org.opennms.netmgt.collection.api,
Expand Down
Expand Up @@ -40,7 +40,7 @@
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collector; import java.util.stream.Collector;
import java.util.stream.Collectors; import java.util.stream.Collectors;
Expand Down Expand Up @@ -79,6 +79,8 @@
import com.codahale.metrics.Meter; import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer; import com.codahale.metrics.Timer;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableTable; import com.google.common.collect.ImmutableTable;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
Expand Down Expand Up @@ -167,7 +169,7 @@ public class ElasticFlowRepository implements FlowRepository {
* *
* This maps a node ID to a set if snmpInterface IDs. * This maps a node ID to a set if snmpInterface IDs.
*/ */
private final ConcurrentMap<Integer, Set<Integer>> markerCache = Maps.newConcurrentMap(); private final Map<Direction, Cache<Integer, Set<Integer>>> markerCache = Maps.newEnumMap(Direction.class);


public ElasticFlowRepository(MetricRegistry metricRegistry, JestClient jestClient, IndexStrategy indexStrategy, public ElasticFlowRepository(MetricRegistry metricRegistry, JestClient jestClient, IndexStrategy indexStrategy,
DocumentEnricher documentEnricher, ClassificationEngine classificationEngine, DocumentEnricher documentEnricher, ClassificationEngine classificationEngine,
Expand All @@ -193,11 +195,25 @@ public ElasticFlowRepository(MetricRegistry metricRegistry, JestClient jestClien
logMarkingTimer = metricRegistry.timer("logMarking"); logMarkingTimer = metricRegistry.timer("logMarking");
flowsPerLog = metricRegistry.histogram("flowsPerLog"); flowsPerLog = metricRegistry.histogram("flowsPerLog");


// Pre-populate marker cache with values from DB this.markerCache.put(Direction.INGRESS, CacheBuilder.newBuilder()
.expireAfterWrite(1, TimeUnit.HOURS)
.build());

this.markerCache.put(Direction.EGRESS, CacheBuilder.newBuilder()
.expireAfterWrite(1, TimeUnit.HOURS)
.build());

this.sessionUtils.withTransaction(() -> { this.sessionUtils.withTransaction(() -> {
for (final OnmsNode node : this.nodeDao.findAllHavingFlows()) { for (final OnmsNode node : this.nodeDao.findAllHavingIngressFlows()) {
this.markerCache.put(node.getId(), this.markerCache.get(Direction.INGRESS).put(node.getId(),
this.snmpInterfaceDao.findAllHavingFlows(node.getId()).stream() this.snmpInterfaceDao.findAllHavingIngressFlows(node.getId()).stream()
.map(OnmsSnmpInterface::getIfIndex)
.collect(Collectors.toCollection(Sets::newConcurrentHashSet)));
}

for (final OnmsNode node : this.nodeDao.findAllHavingEgressFlows()) {
this.markerCache.get(Direction.EGRESS).put(node.getId(),
this.snmpInterfaceDao.findAllHavingEgressFlows(node.getId()).stream()
.map(OnmsSnmpInterface::getIfIndex) .map(OnmsSnmpInterface::getIfIndex)
.collect(Collectors.toCollection(Sets::newConcurrentHashSet))); .collect(Collectors.toCollection(Sets::newConcurrentHashSet)));
} }
Expand Down Expand Up @@ -255,42 +271,56 @@ public void persist(final Collection<Flow> flows, final FlowSource source) throw


// Mark nodes and interfaces as having associated flows // Mark nodes and interfaces as having associated flows
try (final Timer.Context ctx = logMarkingTimer.time()) { try (final Timer.Context ctx = logMarkingTimer.time()) {
final List<Integer> nodesToUpdate = Lists.newArrayListWithExpectedSize(flowDocuments.size()); final Map<Direction, List<Integer>> nodesToUpdate = Maps.newEnumMap(Direction.class);
final Map<Integer, List<Integer>> interfacesToUpdate = Maps.newHashMap(); final Map<Direction, Map<Integer, List<Integer>>> interfacesToUpdate = Maps.newEnumMap(Direction.class);

nodesToUpdate.put(Direction.INGRESS, Lists.newArrayListWithExpectedSize(flowDocuments.size()));
nodesToUpdate.put(Direction.EGRESS, Lists.newArrayListWithExpectedSize(flowDocuments.size()));
interfacesToUpdate.put(Direction.INGRESS, Maps.newHashMap());
interfacesToUpdate.put(Direction.EGRESS, Maps.newHashMap());


for (final FlowDocument flow : flowDocuments) { for (final FlowDocument flow : flowDocuments) {
if (flow.getNodeExporter() == null) continue; if (flow.getNodeExporter() == null) continue;
if (flow.getNodeExporter().getNodeId() == null) continue; if (flow.getNodeExporter().getNodeId() == null) continue;


final Integer nodeId = flow.getNodeExporter().getNodeId(); final Integer nodeId = flow.getNodeExporter().getNodeId();


Set<Integer> ifaceMarkerCache = this.markerCache.get(nodeId); Set<Integer> ifaceMarkerCache = this.markerCache.get(flow.getDirection()).getIfPresent(nodeId);

if (ifaceMarkerCache == null) { if (ifaceMarkerCache == null) {
this.markerCache.put(nodeId, ifaceMarkerCache = Sets.newConcurrentHashSet()); this.markerCache.get(flow.getDirection()).put(nodeId, ifaceMarkerCache = Sets.newConcurrentHashSet());
nodesToUpdate.add(nodeId); nodesToUpdate.get(flow.getDirection()).add(nodeId);
} }


if (flow.getInputSnmp() != null && if (flow.getInputSnmp() != null &&
flow.getInputSnmp() != 0 && flow.getInputSnmp() != 0 &&
!ifaceMarkerCache.contains(flow.getInputSnmp())) { !ifaceMarkerCache.contains(flow.getInputSnmp())) {
ifaceMarkerCache.add(flow.getInputSnmp()); ifaceMarkerCache.add(flow.getInputSnmp());
interfacesToUpdate.computeIfAbsent(nodeId, k -> Lists.newArrayList()).add(flow.getInputSnmp()); interfacesToUpdate.get(flow.getDirection()).computeIfAbsent(nodeId, k -> Lists.newArrayList()).add(flow.getInputSnmp());
} }
if (flow.getOutputSnmp() != null && if (flow.getOutputSnmp() != null &&
flow.getOutputSnmp() != 0 && flow.getOutputSnmp() != 0 &&
!ifaceMarkerCache.contains(flow.getOutputSnmp())) { !ifaceMarkerCache.contains(flow.getOutputSnmp())) {
ifaceMarkerCache.add(flow.getOutputSnmp()); ifaceMarkerCache.add(flow.getOutputSnmp());
interfacesToUpdate.computeIfAbsent(nodeId, k -> Lists.newArrayList()).add(flow.getOutputSnmp()); interfacesToUpdate.get(flow.getDirection()).computeIfAbsent(nodeId, k -> Lists.newArrayList()).add(flow.getOutputSnmp());
} }
} }


if (!nodesToUpdate.isEmpty() || !interfacesToUpdate.isEmpty()) { if (!nodesToUpdate.get(Direction.INGRESS).isEmpty() ||
!interfacesToUpdate.get(Direction.INGRESS).isEmpty() ||
!nodesToUpdate.get(Direction.EGRESS).isEmpty() ||
!interfacesToUpdate.get(Direction.EGRESS).isEmpty()) {
sessionUtils.withTransaction(() -> { sessionUtils.withTransaction(() -> {
if (!nodesToUpdate.isEmpty()) { if (!nodesToUpdate.get(Direction.INGRESS).isEmpty() || !nodesToUpdate.get(Direction.EGRESS).isEmpty()) {
this.nodeDao.markHavingFlows(nodesToUpdate); this.nodeDao.markHavingFlows(nodesToUpdate.get(Direction.INGRESS), nodesToUpdate.get(Direction.EGRESS));
} }
for (final Map.Entry<Integer, List<Integer>> e : interfacesToUpdate.entrySet()) {
this.snmpInterfaceDao.markHavingFlows(e.getKey(), e.getValue()); for (final Map.Entry<Integer, List<Integer>> e : interfacesToUpdate.get(Direction.INGRESS).entrySet()) {
this.snmpInterfaceDao.markHavingIngressFlows(e.getKey(), e.getValue());
}

for (final Map.Entry<Integer, List<Integer>> e : interfacesToUpdate.get(Direction.EGRESS).entrySet()) {
this.snmpInterfaceDao.markHavingEgressFlows(e.getKey(), e.getValue());
} }
return null; return null;
}); });
Expand Down
6 changes: 4 additions & 2 deletions opennms-base-assembly/src/main/filtered/etc/create.sql
Expand Up @@ -486,7 +486,8 @@ create table node (
foreignSource varchar(64), foreignSource varchar(64),
foreignId varchar(64), foreignId varchar(64),
location text not null, location text not null,
hasFlows boolean not null default false, last_ingress_flow timestamp with time zone,
last_egress_flow timestamp with time zone,


constraint pk_nodeID primary key (nodeID), constraint pk_nodeID primary key (nodeID),
constraint fk_node_location foreign key (location) references monitoringlocations (id) ON UPDATE CASCADE constraint fk_node_location foreign key (location) references monitoringlocations (id) ON UPDATE CASCADE
Expand Down Expand Up @@ -555,7 +556,8 @@ create table snmpInterface (
snmpCollect varchar(2) default 'N', snmpCollect varchar(2) default 'N',
snmpPoll varchar(1) default 'N', snmpPoll varchar(1) default 'N',
snmpLastSnmpPoll timestamp with time zone, snmpLastSnmpPoll timestamp with time zone,
hasFlows boolean not null default false, last_ingress_flow timestamp with time zone,
last_egress_flow timestamp with time zone,


CONSTRAINT snmpinterface_pkey primary key (id), CONSTRAINT snmpinterface_pkey primary key (id),
constraint fk_nodeID2 foreign key (nodeID) references node ON DELETE CASCADE constraint fk_nodeID2 foreign key (nodeID) references node ON DELETE CASCADE
Expand Down

0 comments on commit 7af8b1c

Please sign in to comment.