Skip to content

Commit

Permalink
Expose Community ID processor in Painless (#73963)
Browse files Browse the repository at this point in the history
  • Loading branch information
danhermann committed Jun 17, 2021
1 parent 1338a11 commit bad29e1
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 44 deletions.
Expand Up @@ -25,6 +25,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.Supplier;

import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
import static org.elasticsearch.ingest.ConfigurationUtils.readBooleanProperty;
Expand All @@ -33,6 +34,15 @@ public final class CommunityIdProcessor extends AbstractProcessor {

public static final String TYPE = "community_id";

private static final ThreadLocal<MessageDigest> MESSAGE_DIGEST = ThreadLocal.withInitial(() -> {
try {
return MessageDigest.getInstance("SHA-1");
} catch (NoSuchAlgorithmException e) {
// should never happen, SHA-1 must be available in all JDKs
throw new IllegalStateException(e);
}
});

private final String sourceIpField;
private final String sourcePortField;
private final String destinationIpField;
Expand All @@ -42,7 +52,6 @@ public final class CommunityIdProcessor extends AbstractProcessor {
private final String icmpTypeField;
private final String icmpCodeField;
private final String targetField;
private final ThreadLocal<MessageDigest> messageDigest;
private final byte[] seed;
private final boolean ignoreMissing;

Expand All @@ -58,7 +67,6 @@ public final class CommunityIdProcessor extends AbstractProcessor {
String icmpTypeField,
String icmpCodeField,
String targetField,
ThreadLocal<MessageDigest> messageDigest,
byte[] seed,
boolean ignoreMissing
) {
Expand All @@ -72,7 +80,6 @@ public final class CommunityIdProcessor extends AbstractProcessor {
this.icmpTypeField = icmpTypeField;
this.icmpCodeField = icmpCodeField;
this.targetField = targetField;
this.messageDigest = messageDigest;
this.seed = seed;
this.ignoreMissing = ignoreMissing;
}
Expand Down Expand Up @@ -113,10 +120,6 @@ public String getTargetField() {
return targetField;
}

public MessageDigest getMessageDigest() {
return messageDigest.get();
}

public byte[] getSeed() {
return seed;
}
Expand All @@ -127,7 +130,15 @@ public boolean getIgnoreMissing() {

@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
Flow flow = buildFlow(ingestDocument);
String sourceIp = ingestDocument.getFieldValue(sourceIpField, String.class, ignoreMissing);
String destinationIp = ingestDocument.getFieldValue(destinationIpField, String.class, ignoreMissing);
Object ianaNumber = ingestDocument.getFieldValue(ianaNumberField, Object.class, true);
Supplier<Object> transport = () -> ingestDocument.getFieldValue(transportField, Object.class, ignoreMissing);
Supplier<Object> sourcePort = () -> ingestDocument.getFieldValue(sourcePortField, Object.class, ignoreMissing);
Supplier<Object> destinationPort = () -> ingestDocument.getFieldValue(destinationPortField, Object.class, ignoreMissing);
Object icmpType = ingestDocument.getFieldValue(icmpTypeField, Object.class, true);
Object icmpCode = ingestDocument.getFieldValue(icmpCodeField, Object.class, true);
Flow flow = buildFlow(sourceIp, destinationIp, ianaNumber, transport, sourcePort, destinationPort, icmpType, icmpCode);
if (flow == null) {
if (ignoreMissing) {
return ingestDocument;
Expand All @@ -136,19 +147,50 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
}
}

MessageDigest md = messageDigest.get();
md.reset();
ingestDocument.setFieldValue(targetField, flow.toCommunityId(md, seed));
ingestDocument.setFieldValue(targetField, flow.toCommunityId(seed));
return ingestDocument;
}

private Flow buildFlow(IngestDocument d) {
String sourceIpAddrString = d.getFieldValue(sourceIpField, String.class, ignoreMissing);
public static String apply(
String sourceIpAddrString,
String destIpAddrString,
Object ianaNumber,
Object transport,
Object sourcePort,
Object destinationPort,
Object icmpType,
Object icmpCode,
int seed) {

Flow flow = buildFlow(sourceIpAddrString, destIpAddrString, ianaNumber, () -> transport, () -> sourcePort, () -> destinationPort,
icmpType, icmpCode);

if (flow == null) {
throw new IllegalArgumentException("unable to construct flow from document");
} else {
return flow.toCommunityId(toUint16(seed));
}
}

public static String apply(
String sourceIpAddrString,
String destIpAddrString,
Object ianaNumber,
Object transport,
Object sourcePort,
Object destinationPort,
Object icmpType,
Object icmpCode) {
return apply(sourceIpAddrString, destIpAddrString, ianaNumber, transport, sourcePort, destinationPort, icmpType, icmpCode, 0);
}

private static Flow buildFlow(String sourceIpAddrString, String destIpAddrString, Object ianaNumber,
Supplier<Object> transport, Supplier<Object> sourcePort, Supplier<Object> destinationPort,
Object icmpType, Object icmpCode) {
if (sourceIpAddrString == null) {
return null;
}

String destIpAddrString = d.getFieldValue(destinationIpField, String.class, ignoreMissing);
if (destIpAddrString == null) {
return null;
}
Expand All @@ -157,9 +199,9 @@ private Flow buildFlow(IngestDocument d) {
flow.source = InetAddresses.forString(sourceIpAddrString);
flow.destination = InetAddresses.forString(destIpAddrString);

Object protocol = d.getFieldValue(ianaNumberField, Object.class, true);
Object protocol = ianaNumber;
if (protocol == null) {
protocol = d.getFieldValue(transportField, Object.class, ignoreMissing);
protocol = transport.get();
if (protocol == null) {
return null;
}
Expand All @@ -170,23 +212,21 @@ private Flow buildFlow(IngestDocument d) {
case Tcp:
case Udp:
case Sctp:
Object sourcePortValue = d.getFieldValue(sourcePortField, Object.class, ignoreMissing);
flow.sourcePort = parseIntFromObjectOrString(sourcePortValue, "source port");
flow.sourcePort = parseIntFromObjectOrString(sourcePort.get(), "source port");
if (flow.sourcePort < 1 || flow.sourcePort > 65535) {
throw new IllegalArgumentException("invalid source port [" + sourcePortValue + "]");
throw new IllegalArgumentException("invalid source port [" + sourcePort.get() + "]");
}

Object destinationPortValue = d.getFieldValue(destinationPortField, Object.class, ignoreMissing);
flow.destinationPort = parseIntFromObjectOrString(destinationPortValue, "destination port");
flow.destinationPort = parseIntFromObjectOrString(destinationPort.get(), "destination port");
if (flow.destinationPort < 1 || flow.destinationPort > 65535) {
throw new IllegalArgumentException("invalid destination port [" + destinationPortValue + "]");
throw new IllegalArgumentException("invalid destination port [" + destinationPort.get() + "]");
}
break;
case Icmp:
case IcmpIpV6:
// tolerate missing or invalid ICMP types and codes
flow.icmpType = parseIntFromObjectOrString(d.getFieldValue(icmpTypeField, Object.class, true), "icmp type");
flow.icmpCode = parseIntFromObjectOrString(d.getFieldValue(icmpCodeField, Object.class, true), "icmp code");
flow.icmpType = parseIntFromObjectOrString(icmpType, "icmp type");
flow.icmpCode = parseIntFromObjectOrString(icmpCode, "icmp code");
break;
}

Expand Down Expand Up @@ -258,13 +298,6 @@ public CommunityIdProcessor create(
if (seedInt < 0 || seedInt > 65535) {
throw newConfigurationException(TYPE, processorTag, "seed", "must be a value between 0 and 65535");
}
ThreadLocal<MessageDigest> messageDigest = ThreadLocal.withInitial(() -> {
try {
return MessageDigest.getInstance("SHA-1");
} catch (NoSuchAlgorithmException e) {
throw new IllegalStateException("unable to obtain SHA-1 hasher", e);
}
});

boolean ignoreMissing = readBooleanProperty(TYPE, processorTag, config, "ignore_missing", true);
return new CommunityIdProcessor(
Expand All @@ -279,7 +312,6 @@ public CommunityIdProcessor create(
icmpTypeField,
icmpCodeField,
targetField,
messageDigest,
toUint16(seedInt),
ignoreMissing
);
Expand Down Expand Up @@ -343,7 +375,9 @@ byte[] toBytes() {
return bb.array();
}

String toCommunityId(MessageDigest md, byte[] seed) {
String toCommunityId(byte[] seed) {
MessageDigest md = MESSAGE_DIGEST.get();
md.reset();
md.update(seed);
byte[] encodedBytes = Base64.getEncoder().encode(md.digest(toBytes()));
return "1:" + new String(encodedBytes, StandardCharsets.UTF_8);
Expand Down
Expand Up @@ -86,6 +86,75 @@ public static String urlDecode(String value) {
}

/**
* Uses {@link CommunityIdProcessor} to compute community ID for network flow data.
*
* @param sourceIpAddrString source IP address
* @param destIpAddrString destination IP address
* @param ianaNumber IANA number
* @param transport transport protocol
* @param sourcePort source port
* @param destinationPort destination port
* @param icmpType ICMP type
* @param icmpCode ICMP code
* @param seed hash seed (must be between 0 and 65535)
* @return Community ID
*/
public static String communityId(
String sourceIpAddrString,
String destIpAddrString,
Object ianaNumber,
Object transport,
Object sourcePort,
Object destinationPort,
Object icmpType,
Object icmpCode,
int seed) {
return CommunityIdProcessor.apply(
sourceIpAddrString,
destIpAddrString,
ianaNumber,
transport,
sourcePort,
destinationPort,
icmpType,
icmpCode,
seed
);
}

/**
* Uses {@link CommunityIdProcessor} to compute community ID for network flow data.
*
* @param sourceIpAddrString source IP address
* @param destIpAddrString destination IP address
* @param ianaNumber IANA number
* @param transport transport protocol
* @param sourcePort source port
* @param destinationPort destination port
* @param icmpType ICMP type
* @param icmpCode ICMP code
* @return Community ID
*/
public static String communityId(
String sourceIpAddrString,
String destIpAddrString,
Object ianaNumber,
Object transport,
Object sourcePort,
Object destinationPort,
Object icmpType,
Object icmpCode) {
return CommunityIdProcessor.apply(sourceIpAddrString,
destIpAddrString,
ianaNumber,
transport,
sourcePort,
destinationPort,
icmpType,
icmpCode);
}

/*
* Uses {@link UriPartsProcessor} to decompose an URI into its constituent parts.
*
* @param uri string to decode
Expand Down
Expand Up @@ -15,5 +15,7 @@ class org.elasticsearch.ingest.common.Processors {
Object json(Object)
void json(Map, String)
String urlDecode(String)
String communityId(String, String, Object, Object, Object, Object, Object, Object, int)
String communityId(String, String, Object, Object, Object, Object, Object, Object)
Map uriParts(String)
}
Expand Up @@ -12,8 +12,6 @@
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;

import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -38,17 +36,9 @@ public class CommunityIdProcessorTests extends ESTestCase {
// https://github.com/elastic/beats/blob/master/libbeat/processors/communityid/communityid_test.go

private Map<String, Object> event;
private ThreadLocal<MessageDigest> messageDigest;

@Before
public void setup() throws Exception {
messageDigest = ThreadLocal.withInitial(() -> {
try {
return MessageDigest.getInstance("SHA-1");
} catch (NoSuchAlgorithmException e) {
throw new IllegalStateException("unable to obtain SHA-1 hasher", e);
}
});
event = buildEvent();
}

Expand Down Expand Up @@ -323,6 +313,19 @@ public void testIgnoreMissing() throws Exception {
testCommunityIdProcessor(event, 0, null, true);
}

public void testIgnoreMissingIsFalse() throws Exception {
@SuppressWarnings("unchecked")
var source = (Map<String, Object>) event.get("source");
source.remove("ip");

IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> testCommunityIdProcessor(event, 0, null, false)
);

assertThat(e.getMessage(), containsString("field [ip] not present as part of path [source.ip]"));
}

private void testCommunityIdProcessor(Map<String, Object> source, String expectedHash) throws Exception {
testCommunityIdProcessor(source, 0, expectedHash);
}
Expand All @@ -346,7 +349,6 @@ private void testCommunityIdProcessor(Map<String, Object> source, int seed, Stri
DEFAULT_ICMP_TYPE,
DEFAULT_ICMP_CODE,
DEFAULT_TARGET,
messageDigest,
CommunityIdProcessor.toUint16(seed),
ignoreMissing
);
Expand Down
Expand Up @@ -203,6 +203,46 @@ teardown:
- match: { _source.source_field: "foo%20bar" }
- match: { _source.target_field: "foo bar" }

---
"Test invoke community_id processor":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"script" : {
"lang": "painless",
"source" : "ctx.target_field1 = Processors.communityId('128.232.110.120', '66.35.250.204', null, 'TCP', 34855, 80, null, null, 123);"
}
},
{
"script" : {
"lang": "painless",
"source" : "ctx.target_field2 = Processors.communityId('128.232.110.120', '66.35.250.204', null, 'TCP', 34855, 80, null, null);"
}
}
]
}
- match: { acknowledged: true }

- do:
index:
index: test
id: 1
pipeline: "my_pipeline"
body: {source_field: "foo"}

- do:
get:
index: test
id: 1
- match: { _source.source_field: "foo" }
- match: { _source.target_field1: "1:hTSGlFQnR58UCk+NfKRZzA32dPg=" }
- match: { _source.target_field2: "1:LQU9qZlK+B5F3KDmev6m5PMibrg=" }

---
"Test invoke uri_parts processor":
- do:
Expand Down

0 comments on commit bad29e1

Please sign in to comment.