Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
fooker committed Dec 14, 2017
1 parent 711a360 commit 4c40c92
Show file tree
Hide file tree
Showing 22 changed files with 348 additions and 30 deletions.
1 change: 1 addition & 0 deletions container/features/pom.xml
Expand Up @@ -343,6 +343,7 @@
<feature>datachoices</feature>
<feature>eif-adapter</feature>
<feature>opennms-telemetry-collection</feature>
<feature>opennms-telemetry-flow</feature>
<feature>opennms-telemetry-jti</feature>
<feature>opennms-telemetry-nxos</feature>

Expand Down
2 changes: 1 addition & 1 deletion container/features/src/main/resources/features-minion.xml
Expand Up @@ -136,8 +136,8 @@
<bundle>mvn:org.opennms.features.telemetry.adapters/org.opennms.features.telemetry.adapters.api/${project.version}</bundle>
<bundle>mvn:org.opennms.features.telemetry/org.opennms.features.telemetry.common/${project.version}</bundle>
<bundle>mvn:org.opennms.features.telemetry.listeners/org.opennms.features.telemetry.listeners.api/${project.version}</bundle>
<bundle>mvn:org.opennms.features.telemetry.listeners/org.opennms.features.telemetry.listeners.udp/${project.version}</bundle>
<bundle>mvn:org.opennms.features.telemetry.listeners/org.opennms.features.telemetry.listeners.flow/${project.version}</bundle>
<bundle>mvn:org.opennms.features.telemetry.listeners/org.opennms.features.telemetry.listeners.udp/${project.version}</bundle>
<bundle>mvn:org.opennms.features.telemetry/org.opennms.features.telemetry.minion/${project.version}</bundle>
</feature>
</features>
6 changes: 6 additions & 0 deletions container/features/src/main/resources/features.xml
Expand Up @@ -1121,6 +1121,12 @@
<bundle>mvn:org.opennms.features.telemetry.adapters/org.opennms.features.telemetry.adapters.collection/${project.version}</bundle>
</feature>

<feature name="opennms-telemetry-flow" description="OpenNMS :: Telemetry :: Flow" version="${project.version}">
<feature>opennms-telemetry-collection</feature>
<bundle>mvn:org.mongodb/bson/${bsonVersion}</bundle>
<bundle>mvn:org.opennms.features.telemetry.adapters/org.opennms.features.telemetry.adapters.flow/${project.version}</bundle>
</feature>

<feature name="opennms-telemetry-jti" description="OpenNMS :: Telemetry :: JTI" version="${project.version}">
<feature>opennms-telemetry-collection</feature>
<bundle>mvn:com.google.protobuf/protobuf-java/${protobufVersion}</bundle>
Expand Down
Expand Up @@ -95,6 +95,7 @@ featuresBoot = ( \
opennms-icmp-commands, \
opennms-snmp-commands, \
opennms-telemetry-collection,\
opennms-telemetry-flow,\
opennms-telemetry-jti,\
opennms-telemetry-nxos,\
opennms-flows, \
Expand Down
5 changes: 5 additions & 0 deletions features/flows/feature/pom.xml
Expand Up @@ -67,5 +67,10 @@
<artifactId>org.opennms.features.telemetry.adapters.netflow</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.opennms.features.telemetry.adapters</groupId>
<artifactId>org.opennms.features.telemetry.adapters.flow</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
45 changes: 45 additions & 0 deletions features/telemetry/adapters/flow/pom.xml
@@ -0,0 +1,45 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
<groupId>org.opennms.features.telemetry</groupId>
<artifactId>org.opennms.features.telemetry.adapters</artifactId>
<version>22.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>org.opennms.features.telemetry.adapters</groupId>
<artifactId>org.opennms.features.telemetry.adapters.flow</artifactId>
<name>OpenNMS :: Features :: Telemetry :: Adapters :: Flow</name>
<packaging>bundle</packaging>
<build>
<plugins>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<extensions>true</extensions>
<configuration>
<instructions>
<Bundle-RequiredExecutionEnvironment>JavaSE-1.8</Bundle-RequiredExecutionEnvironment>
<Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
<Bundle-Version>${project.version}</Bundle-Version>
</instructions>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.opennms.features.telemetry.adapters</groupId>
<artifactId>org.opennms.features.telemetry.adapters.collection</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.osgi</groupId>
<artifactId>org.osgi.core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>bson</artifactId>
</dependency>
</dependencies>
</project>
@@ -0,0 +1,148 @@
/*******************************************************************************
* This file is part of OpenNMS(R).
*
* Copyright (C) 2017-2017 The OpenNMS Group, Inc.
* OpenNMS(R) is Copyright (C) 1999-2017 The OpenNMS Group, Inc.
*
* OpenNMS(R) is a registered trademark of The OpenNMS Group, Inc.
*
* OpenNMS(R) is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License,
* or (at your option) any later version.
*
* OpenNMS(R) is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with OpenNMS(R). If not, see:
* http://www.gnu.org/licenses/
*
* For more information contact:
* OpenNMS(R) Licensing <license@opennms.org>
* http://www.opennms.org/
* http://www.opennms.com/
*******************************************************************************/

package org.opennms.netmgt.telemetry.adapters.flow;

import java.io.File;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Optional;

import org.bson.RawBsonDocument;
import org.opennms.netmgt.collection.api.CollectionAgent;
import org.opennms.netmgt.collection.api.CollectionAgentFactory;
import org.opennms.netmgt.collection.api.CollectionSet;
import org.opennms.netmgt.dao.api.InterfaceToNodeCache;
import org.opennms.netmgt.dao.api.NodeDao;
import org.opennms.netmgt.telemetry.adapters.api.TelemetryMessage;
import org.opennms.netmgt.telemetry.adapters.api.TelemetryMessageLog;
import org.opennms.netmgt.telemetry.adapters.collection.AbstractPersistingAdapter;
import org.opennms.netmgt.telemetry.adapters.collection.CollectionSetWithAgent;
import org.opennms.netmgt.telemetry.adapters.collection.ScriptedCollectionSetBuilder;
import org.osgi.framework.BundleContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.support.TransactionOperations;

public class FlowAdapter extends AbstractPersistingAdapter {

private static final Logger LOG = LoggerFactory.getLogger(FlowAdapter.class);

@Autowired
private CollectionAgentFactory collectionAgentFactory;

@Autowired
private InterfaceToNodeCache interfaceToNodeCache;

@Autowired
private NodeDao nodeDao;

@Autowired
private TransactionOperations transactionTemplate;

private BundleContext bundleContext;

private String script;

private final ThreadLocal<ScriptedCollectionSetBuilder> scriptedCollectionSetBuilders = new ThreadLocal<ScriptedCollectionSetBuilder>() {
@Override
protected ScriptedCollectionSetBuilder initialValue() {
try {
if (bundleContext != null) {
return new ScriptedCollectionSetBuilder(new File(script), bundleContext);
} else {
return new ScriptedCollectionSetBuilder(new File(script));
}
} catch (Exception e) {
LOG.error("Failed to create builder for script '{}'.", script, e);
return null;
}
}
};

public String getScript() {
return this.script;
}

public void setScript(String script) {
this.script = script;
}

@Override
public Optional<CollectionSetWithAgent> handleMessage(final TelemetryMessage message, final TelemetryMessageLog messageLog) throws Exception {
final RawBsonDocument flow = new RawBsonDocument(message.getByteArray());

LOG.warn("Flow: {}", flow.toJson());

CollectionAgent agent = null;
try {
final InetAddress inetAddress = InetAddress.getByName(messageLog.getSourceAddress());
final Optional<Integer> nodeId = this.interfaceToNodeCache.getFirstNodeId(messageLog.getLocation(), inetAddress);
if (nodeId.isPresent()) {
// NOTE: This will throw a IllegalArgumentException if the nodeId/inetAddress pair does not exist in the database
agent = this.collectionAgentFactory.createCollectionAgent(Integer.toString(nodeId.get()), inetAddress);
}
} catch (UnknownHostException e) {
LOG.debug("Could not convert source address: {}", messageLog.getSourceAddress());
}

if (agent == null) {
LOG.warn("Unable to find node for address: {}", messageLog.getSourceAddress());
return Optional.empty();
}

final ScriptedCollectionSetBuilder builder = this.scriptedCollectionSetBuilders.get();
if (builder == null) {
throw new Exception(String.format("Error compiling script '%s'. See logs for details.", script));
}
final CollectionSet collectionSet = builder.build(agent, flow);
return Optional.of(new CollectionSetWithAgent(agent, collectionSet));
}

public void setCollectionAgentFactory(CollectionAgentFactory collectionAgentFactory) {
this.collectionAgentFactory = collectionAgentFactory;
}

public void setInterfaceToNodeCache(InterfaceToNodeCache interfaceToNodeCache) {
this.interfaceToNodeCache = interfaceToNodeCache;
}

public void setNodeDao(NodeDao nodeDao) {
this.nodeDao = nodeDao;
}

public void setTransactionTemplate(TransactionOperations transactionTemplate) {
this.transactionTemplate = transactionTemplate;
}

public void setBundleContext(BundleContext bundleContext) {
this.bundleContext = bundleContext;
}

}
@@ -0,0 +1,68 @@
/*******************************************************************************
* This file is part of OpenNMS(R).
*
* Copyright (C) 2017-2017 The OpenNMS Group, Inc.
* OpenNMS(R) is Copyright (C) 1999-2017 The OpenNMS Group, Inc.
*
* OpenNMS(R) is a registered trademark of The OpenNMS Group, Inc.
*
* OpenNMS(R) is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License,
* or (at your option) any later version.
*
* OpenNMS(R) is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with OpenNMS(R). If not, see:
* http://www.gnu.org/licenses/
*
* For more information contact:
* OpenNMS(R) Licensing <license@opennms.org>
* http://www.opennms.org/
* http://www.opennms.com/
*******************************************************************************/

package org.opennms.netmgt.telemetry.adapters.flow;

import java.util.Map;

import org.opennms.netmgt.telemetry.adapters.api.Adapter;
import org.opennms.netmgt.telemetry.adapters.collection.AbstractCollectionAdapterFactory;
import org.opennms.netmgt.telemetry.config.api.Protocol;
import org.osgi.framework.BundleContext;
import org.springframework.beans.BeanWrapper;
import org.springframework.beans.PropertyAccessorFactory;

public class FlowAdapterFactory extends AbstractCollectionAdapterFactory {

public FlowAdapterFactory(BundleContext bundleContext) {
super(bundleContext);
}

@Override
public Class<? extends Adapter> getAdapterClass() {
return FlowAdapter.class;
}

@Override
public Adapter createAdapter(Protocol protocol, Map<String, String> properties) {
final FlowAdapter adapter = new FlowAdapter();
adapter.setProtocol(protocol);
adapter.setCollectionAgentFactory(getCollectionAgentFactory());
adapter.setInterfaceToNodeCache(getInterfaceToNodeCache());
adapter.setNodeDao(getNodeDao());
adapter.setTransactionTemplate(getTransactionTemplate());
adapter.setFilterDao(getFilterDao());
adapter.setPersisterFactory(getPersisterFactory());
adapter.setBundleContext(getBundleContext());

final BeanWrapper wrapper = PropertyAccessorFactory.forBeanPropertyAccess(adapter);
wrapper.setPropertyValues(properties);
return adapter;
}

}
@@ -0,0 +1,36 @@
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.1.0" xmlns:ext="http://aries.apache.org/blueprint/xmlns/blueprint-ext/v1.1.0"
xsi:schemaLocation="
http://www.osgi.org/xmlns/blueprint/v1.0.0
http://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd
http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.1.0
http://aries.apache.org/schemas/blueprint-cm/blueprint-cm-1.1.0.xsd
http://aries.apache.org/blueprint/xmlns/blueprint-ext/v1.1.0
http://aries.apache.org/schemas/blueprint-ext/blueprint-ext-1.1.xsd
">

<bean id="flowAdapterFactory" class="org.opennms.netmgt.telemetry.adapters.flow.FlowAdapterFactory">
<argument ref="blueprintBundleContext" />
<property name="collectionAgentFactory" ref="collectionAgentFactory" />
<property name="interfaceToNodeCache" ref="interfaceToNodeCache" />
<property name="nodeDao" ref="nodeDao" />
<property name="transactionTemplate" ref="transactionTemplate" />
<property name="filterDao" ref="filterDao" />
<property name="persisterFactory" ref="persisterFactory" />
</bean>

<service id="flowFactoryService" ref="flowAdapterFactory" interface="org.opennms.netmgt.telemetry.adapters.api.AdapterFactory">
<service-properties>
<entry key="registration.export" value="true" />
<entry key="type" value="org.opennms.netmgt.telemetry.adapters.flow.FlowAdapter" />
</service-properties>
</service>

<reference id="collectionAgentFactory" interface="org.opennms.netmgt.collection.api.CollectionAgentFactory" />
<reference id="interfaceToNodeCache" interface="org.opennms.netmgt.dao.api.InterfaceToNodeCache" />
<reference id="nodeDao" interface="org.opennms.netmgt.dao.api.NodeDao" />
<reference id="filterDao" interface="org.opennms.netmgt.filter.api.FilterDao" />
<reference id="transactionTemplate" interface="org.springframework.transaction.support.TransactionOperations" />
<reference id="persisterFactory" interface="org.opennms.netmgt.collection.api.PersisterFactory" />

</blueprint>
1 change: 1 addition & 0 deletions features/telemetry/adapters/pom.xml
Expand Up @@ -13,6 +13,7 @@
<modules>
<module>api</module>
<module>collection</module>
<module>flow</module>
<module>jti</module>
<module>netflow</module>
<module>nxos</module>
Expand Down
1 change: 0 additions & 1 deletion features/telemetry/listeners/flow/pom.xml
Expand Up @@ -53,7 +53,6 @@
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>bson</artifactId>
<version>3.5.0</version>
</dependency>
</dependencies>
</project>
Expand Up @@ -41,6 +41,10 @@ public InvalidPacketException(final ByteBuffer buffer, final String message, fin
}

private static String appendPosition(final String message, final ByteBuffer buffer) {
return String.format("%s [0x%04X]", message, buffer.arrayOffset() + buffer.position());
if (buffer.hasArray()) {
return String.format("%s [0x%04X]", message, buffer.arrayOffset() + buffer.position());
} else {
return message;
}
}
}
Expand Up @@ -76,7 +76,7 @@ public PacketHandler(final Protocol protocol,

@Override
protected void channelRead0(final ChannelHandlerContext ctx, final DefaultAddressedEnvelope<RecordProvider, InetSocketAddress> packet) throws Exception {
LOG.info("Got packet: {}", packet);
LOG.trace("Got packet: {}", packet);

packet.content().getRecords().forEach(record -> {
final ByteBuffer buffer = serialize(this.protocol, record);
Expand Down
Expand Up @@ -85,7 +85,7 @@ protected void initChannel(final SocketChannel ch) throws Exception {
.addLast(new ChannelInboundHandlerAdapter() {
@Override
public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception {
LOG.warn("Invalid packet: {}", cause);
LOG.warn("Invalid packet: {}", cause.getMessage());
ctx.close();
}
});
Expand Down

0 comments on commit 4c40c92

Please sign in to comment.