Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

METRON-936: Fixes to pcap for performance and testing #585

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
198 changes: 198 additions & 0 deletions metron-platform/metron-pcap-backend/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,16 @@ The purpose of the Metron PCAP backend is to create a storm topology
capable of ingesting rapidly raw packet capture data directly into HDFS
from Kafka.

* [Sensors](#the-sensors-feeding-kafka)
* [PCAP Topology](#the-pcap-topology)
* [HDFS Files](#the-files-on-hdfs)
* [Configuration](#configuration)
* [Starting the Topology](#starting-the-topology)
* [Utilities](#utilities)
* [Inspector Utility](#inspector-utility)
* [Query Filter Utility](#query-filter-utility)
* [Performance Tuning](#performance-tuning)

## The Sensors Feeding Kafka

This component must be fed by fast packet capture components upstream
Expand Down Expand Up @@ -150,3 +160,191 @@ The packet data will be exposed via the`packet` variable in Stellar.

The format of this regular expression is described [here](https://github.com/nishihatapalmer/byteseek/blob/master/sequencesyntax.md).

## Performance Tuning
The PCAP topology is extremely lightweight and functions as a Spout-only topology. In order to tune the topology, users currently must specify a combination of
properties in pcap.properties as well as configuration in the pcap remote.yaml flux file itself. Tuning the number of partitions in your Kafka topic
will have a dramatic impact on performance as well. We ran data into Kafka at 1.1 Gbps and our tests resulted in configuring 128 partitions for our kakfa topic
along with the following settings in pcap.properties and remote.yaml (unrelated properties for performance have been removed):

### pcap.properties file
```
spout.kafka.topic.pcap=pcap
storm.topology.workers=16
kafka.spout.parallelism=128
kafka.pcap.numPackets=1000000000
kafka.pcap.maxTimeMS=0
hdfs.replication=1
hdfs.sync.every=10000
```
You'll notice that the number of kakfa partitions equals the spout parallelism, and this is no coincidence. The ordering guarantees for a partition in Kafka enforces that you may have no more
consumers than 1 per topic. Any additional parallelism will leave you with dormant threads consuming resources but performing no additional work. For our cluster with 4 Storm Supervisors, we found 16 workers to
provide optimal throughput as well. We were largely IO bound rather than CPU bound with the incoming PCAP data.

### remote.yaml
In the flux file, we introduced the following configuration:

```
name: "pcap"
config:
topology.workers: ${storm.topology.workers}
topology.worker.childopts: ${topology.worker.childopts}
topology.auto-credentials: ${storm.auto.credentials}
topology.ackers.executors: 0
components:

# Any kafka props for the producer go here.
- id: "kafkaProps"
className: "java.util.HashMap"
configMethods:
- name: "put"
args:
- "value.deserializer"
- "org.apache.kafka.common.serialization.ByteArrayDeserializer"
- name: "put"
args:
- "key.deserializer"
- "org.apache.kafka.common.serialization.ByteArrayDeserializer"
- name: "put"
args:
- "group.id"
- "pcap"
- name: "put"
args:
- "security.protocol"
- "${kafka.security.protocol}"
- name: "put"
args:
- "poll.timeout.ms"
- 100
- name: "put"
args:
- "offset.commit.period.ms"
- 30000
- name: "put"
args:
- "session.timeout.ms"
- 30000
- name: "put"
args:
- "max.uncommitted.offsets"
- 200000000
- name: "put"
args:
- "max.poll.interval.ms"
- 10
- name: "put"
args:
- "max.poll.records"
- 200000
- name: "put"
args:
- "receive.buffer.bytes"
- 431072
- name: "put"
args:
- "max.partition.fetch.bytes"
- 8097152

- id: "hdfsProps"
className: "java.util.HashMap"
configMethods:
- name: "put"
args:
- "io.file.buffer.size"
- 1000000
- name: "put"
args:
- "dfs.blocksize"
- 1073741824

- id: "kafkaConfig"
className: "org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder"
constructorArgs:
- ref: "kafkaProps"
# topic name
- "${spout.kafka.topic.pcap}"
- "${kafka.zk}"
configMethods:
- name: "setFirstPollOffsetStrategy"
args:
# One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST
- ${kafka.pcap.start}

- id: "writerConfig"
className: "org.apache.metron.spout.pcap.HDFSWriterConfig"
configMethods:
- name: "withOutputPath"
args:
- "${kafka.pcap.out}"
- name: "withNumPackets"
args:
- ${kafka.pcap.numPackets}
- name: "withMaxTimeMS"
args:
- ${kafka.pcap.maxTimeMS}
- name: "withZookeeperQuorum"
args:
- "${kafka.zk}"
- name: "withSyncEvery"
args:
- ${hdfs.sync.every}
- name: "withReplicationFactor"
args:
- ${hdfs.replication}
- name: "withHDFSConfig"
args:
- ref: "hdfsProps"
- name: "withDeserializer"
args:
- "${kafka.pcap.ts_scheme}"
- "${kafka.pcap.ts_granularity}"
spouts:
- id: "kafkaSpout"
className: "org.apache.metron.spout.pcap.KafkaToHDFSSpout"
parallelism: ${kafka.spout.parallelism}
constructorArgs:
- ref: "kafkaConfig"
- ref: "writerConfig"

```

#### Flux Changes Introduced

##### Topology Configuration

The only change here is `topology.ackers.executors: 0`, which disables Storm tuple acking for maximum throughput.

##### Kafka configuration

```
poll.timeout.ms
offset.commit.period.ms
session.timeout.ms
max.uncommitted.offsets
max.poll.interval.ms
max.poll.records
receive.buffer.bytes
max.partition.fetch.bytes
```

##### Writer Configuration

This is a combination of settings for the HDFSWriter (see pcap.properties values above) as well as HDFS.

__HDFS config__

Component config HashMap with the following properties:
```
io.file.buffer.size
dfs.blocksize
```

__Writer config__

References the HDFS props component specified above.
```
- name: "withHDFSConfig"
args:
- ref: "hdfsProps"
```

Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,18 @@
# limitations under the License.

spout.kafka.topic.pcap=pcap
storm.auto.credentials=[]
topology.worker.childopts=
topology.auto-credentials=[]
topology.workers=1
kafka.zk=node1:2181
hdfs.sync.every=1
hdfs.replication.factor=-1
kafka.security.protocol=PLAINTEXT
kafka.pcap.start=END
# One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST
kafka.pcap.start=UNCOMMITTED_EARLIEST
kafka.pcap.numPackets=1000
kafka.pcap.maxTimeMS=300000
kafka.pcap.ts_scheme=FROM_KEY
kafka.pcap.out=/apps/metron/pcap
kafka.pcap.ts_granularity=MICROSECONDS
kafka.spout.parallelism=1
14 changes: 11 additions & 3 deletions metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@

name: "pcap"
config:
topology.workers: 1
topology.auto-credentials: ${storm.auto.credentials}
topology.workers: ${topology.workers}
topology.worker.childopts: ${topology.worker.childopts}
topology.auto-credentials: ${topology.auto-credentials}

components:

Expand Down Expand Up @@ -53,7 +54,7 @@ components:
- name: "setFirstPollOffsetStrategy"
args:
# One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST
- "UNCOMMITTED_EARLIEST"
- ${kafka.pcap.start}

- id: "writerConfig"
className: "org.apache.metron.spout.pcap.HDFSWriterConfig"
Expand All @@ -70,13 +71,20 @@ components:
- name: "withZookeeperQuorum"
args:
- "${kafka.zk}"
- name: "withSyncEvery"
args:
- ${hdfs.sync.every}
- name: "withReplicationFactor"
args:
- ${hdfs.replication.factor}
- name: "withDeserializer"
args:
- "${kafka.pcap.ts_scheme}"
- "${kafka.pcap.ts_granularity}"
spouts:
- id: "kafkaSpout"
className: "org.apache.metron.spout.pcap.KafkaToHDFSSpout"
parallelism: ${kafka.spout.parallelism}
constructorArgs:
- ref: "kafkaConfig"
- ref: "writerConfig"
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@
package org.apache.metron.spout.pcap;

import com.google.common.base.Joiner;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.log4j.Logger;
import org.apache.metron.spout.pcap.deserializer.KeyValueDeserializer;
import org.apache.storm.kafka.Callback;
import org.apache.storm.kafka.EmitContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.xml.bind.DatatypeConverter;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -38,7 +37,7 @@
*/
public class HDFSWriterCallback implements Callback {
static final long serialVersionUID = 0xDEADBEEFL;
private static final Logger LOG = Logger.getLogger(HDFSWriterCallback.class);
private static final Logger LOG = LoggerFactory.getLogger(HDFSWriterCallback.class);

/**
* A topic+partition. We split the files up by topic+partition so the writers don't clobber each other
Expand Down Expand Up @@ -80,29 +79,12 @@ public String toString() {
}
}

/**
* This is a static container of threadlocal LongWritables and BytesWritables. This keeps us from having to create so
* many objects on the heap. The Deserializers update these for every packet.
*/
private static class KeyValue {
static ThreadLocal<LongWritable> key = new ThreadLocal<LongWritable> () {
@Override
protected LongWritable initialValue() {
return new LongWritable();
}
};
static ThreadLocal<BytesWritable> value = new ThreadLocal<BytesWritable> () {
@Override
protected BytesWritable initialValue() {
return new BytesWritable();
}
};
}
private HDFSWriterConfig config;
private EmitContext context;
private Map<Partition, PartitionHDFSWriter> writers = new HashMap<>();
private PartitionHDFSWriter lastWriter = null;
private String topic;
private boolean inited = false;
public HDFSWriterCallback() {
}

Expand All @@ -116,35 +98,43 @@ public HDFSWriterCallback withConfig(HDFSWriterConfig config) {
public List<Object> apply(List<Object> tuple, EmitContext context) {
byte[] key = (byte[]) tuple.get(0);
byte[] value = (byte[]) tuple.get(1);
if(!config.getDeserializer().deserializeKeyValue(key, value, KeyValue.key.get(), KeyValue.value.get())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KeyValue as a whole is unused now, right? Can we just delete the class entirely at this point?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks that way, good catch.

if(LOG.isDebugEnabled()) {
List<String> debugStatements = new ArrayList<>();
if(key != null) {
debugStatements.add("Key length: " + key.length);
debugStatements.add("Key: " + DatatypeConverter.printHexBinary(key));
}
else {
debugStatements.add("Key is null!");
}

if(value != null) {
debugStatements.add("Value length: " + value.length);
debugStatements.add("Value: " + DatatypeConverter.printHexBinary(value));
}
else {
debugStatements.add("Value is null!");
}
LOG.debug("Dropping malformed packet: " + Joiner.on(" / ").join(debugStatements));
long tsDeserializeStart = System.nanoTime();
KeyValueDeserializer.Result result = config.getDeserializer().deserializeKeyValue(key, value);
long tsDeserializeEnd = System.nanoTime();

if (LOG.isDebugEnabled() && !result.foundTimestamp) {
List<String> debugStatements = new ArrayList<>();
if (key != null) {
debugStatements.add("Key length: " + key.length);
debugStatements.add("Key: " + DatatypeConverter.printHexBinary(key));
} else {
debugStatements.add("Key is null!");
}

if (value != null) {
debugStatements.add("Value length: " + value.length);
debugStatements.add("Value: " + DatatypeConverter.printHexBinary(value));
} else {
debugStatements.add("Value is null!");
}
LOG.debug("Dropping malformed packet: " + Joiner.on(" / ").join(debugStatements));
}

long tsWriteStart = System.nanoTime();
try {
getWriter(new Partition( topic
, context.get(EmitContext.Type.PARTITION))
).handle(KeyValue.key.get(), KeyValue.value.get());
).handle(result.key, result.value);
} catch (IOException e) {
LOG.error(e.getMessage(), e);
//drop? not sure..
}
long tsWriteEnd = System.nanoTime();
if(LOG.isDebugEnabled() && (Math.random() < 0.001 || !inited)) {
LOG.debug("Deserialize time (ns): " + (tsDeserializeEnd - tsDeserializeStart));
LOG.debug("Write time (ns): " + (tsWriteEnd - tsWriteStart));
}
inited = true;
return tuple;
}

Expand Down