Skip to content

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
danhermann committed Jun 16, 2021
1 parent 4551bf2 commit 9c49328
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 164 deletions.
Expand Up @@ -25,6 +25,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.Supplier;

import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
import static org.elasticsearch.ingest.ConfigurationUtils.readBooleanProperty;
Expand All @@ -33,18 +34,14 @@ public final class CommunityIdProcessor extends AbstractProcessor {

public static final String TYPE = "community_id";

private static final MessageDigest MESSAGE_DIGEST;

static {
MessageDigest temp;
private static final ThreadLocal<MessageDigest> MESSAGE_DIGEST = ThreadLocal.withInitial(() -> {
try {
temp = MessageDigest.getInstance("SHA-1");
return MessageDigest.getInstance("SHA-1");
} catch (NoSuchAlgorithmException e) {
// do not fail class initialization
temp = null;
// should never happen, SHA-1 must be available in all JDKs
throw new IllegalStateException(e);
}
MESSAGE_DIGEST = temp;
}
});

private final String sourceIpField;
private final String sourcePortField;
Expand All @@ -55,7 +52,6 @@ public final class CommunityIdProcessor extends AbstractProcessor {
private final String icmpTypeField;
private final String icmpCodeField;
private final String targetField;
private final ThreadLocal<MessageDigest> messageDigest;
private final byte[] seed;
private final boolean ignoreMissing;

Expand All @@ -71,7 +67,6 @@ public final class CommunityIdProcessor extends AbstractProcessor {
String icmpTypeField,
String icmpCodeField,
String targetField,
ThreadLocal<MessageDigest> messageDigest,
byte[] seed,
boolean ignoreMissing
) {
Expand All @@ -85,7 +80,6 @@ public final class CommunityIdProcessor extends AbstractProcessor {
this.icmpTypeField = icmpTypeField;
this.icmpCodeField = icmpCodeField;
this.targetField = targetField;
this.messageDigest = messageDigest;
this.seed = seed;
this.ignoreMissing = ignoreMissing;
}
Expand Down Expand Up @@ -126,10 +120,6 @@ public String getTargetField() {
return targetField;
}

public MessageDigest getMessageDigest() {
return messageDigest.get();
}

public byte[] getSeed() {
return seed;
}
Expand All @@ -140,18 +130,22 @@ public boolean getIgnoreMissing() {

@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
Flow flow = buildFlow(new IngestDocumentFlowValueSupplier(ingestDocument));
String sourceIp = ingestDocument.getFieldValue(sourceIpField, String.class, ignoreMissing);
String destinationIp = ingestDocument.getFieldValue(destinationIpField, String.class, ignoreMissing);
Object ianaNumber = ingestDocument.getFieldValue(ianaNumberField, Object.class, true);
Supplier<Object> transport = () -> ingestDocument.getFieldValue(transportField, Object.class, ignoreMissing);
Supplier<Object> sourcePort = () -> ingestDocument.getFieldValue(sourcePortField, Object.class, ignoreMissing);
Supplier<Object> destinationPort = () -> ingestDocument.getFieldValue(destinationPortField, Object.class, ignoreMissing);
Object icmpType = ingestDocument.getFieldValue(icmpTypeField, Object.class, true);
Object icmpCode = ingestDocument.getFieldValue(icmpCodeField, Object.class, true);
Flow flow = buildFlow(sourceIp, destinationIp, ianaNumber, transport, sourcePort, destinationPort, icmpType, icmpCode);
if (flow == null) {
if (ignoreMissing) {
return ingestDocument;
} else {
throw new IllegalArgumentException("unable to construct flow from document");
}
}

MessageDigest md = messageDigest.get();
md.reset();
ingestDocument.setFieldValue(targetField, flow.toCommunityId(md, seed));
ingestDocument.setFieldValue(targetField, flow.toCommunityId(seed));
return ingestDocument;
}

Expand All @@ -166,59 +160,13 @@ public static String apply(
Object icmpCode,
int seed) {

Flow flow = buildFlow(new FlowValueSupplier(){

@Override
public String getSourceIpAddrString() {
return sourceIpAddrString;
}

@Override
public String getDestIpAddrString() {
return destIpAddrString;
}

@Override
public Object getIanaNumber() {
return ianaNumber;
}

@Override
public Object getTransport() {
return transport;
}

@Override
public Object getSourcePort() {
return sourcePort;
}

@Override
public Object getDestinationPort() {
return destinationPort;
}

@Override
public Object getIcmpType() {
return icmpType;
}

@Override
public Object getIcmpCode() {
return icmpCode;
}
});
Flow flow = buildFlow(sourceIpAddrString, destIpAddrString, ianaNumber, () -> transport, () -> sourcePort, () -> destinationPort,
icmpType, icmpCode);

if (flow == null) {
throw new IllegalArgumentException("unable to construct flow from document");
} else if (MESSAGE_DIGEST != null) {
byte[] seedBytes = toUint16(seed);
synchronized (MESSAGE_DIGEST) {
MESSAGE_DIGEST.reset();
return flow.toCommunityId(MESSAGE_DIGEST, seedBytes);
}
} else {
throw new IllegalStateException("unable to obtain SHA-1 hasher");
return flow.toCommunityId(toUint16(seed));
}
}

Expand All @@ -234,13 +182,13 @@ public static String apply(
return apply(sourceIpAddrString, destIpAddrString, ianaNumber, transport, sourcePort, destinationPort, icmpType, icmpCode, 0);
}

private static Flow buildFlow(FlowValueSupplier s) {
String sourceIpAddrString = s.getSourceIpAddrString();
private static Flow buildFlow(String sourceIpAddrString, String destIpAddrString, Object ianaNumber,
Supplier<Object> transport, Supplier<Object> sourcePort, Supplier<Object> destinationPort,
Object icmpType, Object icmpCode) {
if (sourceIpAddrString == null) {
return null;
}

String destIpAddrString = s.getDestIpAddrString();
if (destIpAddrString == null) {
return null;
}
Expand All @@ -249,9 +197,9 @@ private static Flow buildFlow(FlowValueSupplier s) {
flow.source = InetAddresses.forString(sourceIpAddrString);
flow.destination = InetAddresses.forString(destIpAddrString);

Object protocol = s.getIanaNumber();
Object protocol = ianaNumber;
if (protocol == null) {
protocol = s.getTransport();
protocol = transport.get();
if (protocol == null) {
return null;
}
Expand All @@ -262,23 +210,21 @@ private static Flow buildFlow(FlowValueSupplier s) {
case Tcp:
case Udp:
case Sctp:
Object sourcePortValue = s.getSourcePort();
flow.sourcePort = parseIntFromObjectOrString(sourcePortValue, "source port");
flow.sourcePort = parseIntFromObjectOrString(sourcePort.get(), "source port");
if (flow.sourcePort < 1 || flow.sourcePort > 65535) {
throw new IllegalArgumentException("invalid source port [" + sourcePortValue + "]");
throw new IllegalArgumentException("invalid source port [" + sourcePort.get() + "]");
}

Object destinationPortValue = s.getDestinationPort();
flow.destinationPort = parseIntFromObjectOrString(destinationPortValue, "destination port");
flow.destinationPort = parseIntFromObjectOrString(destinationPort.get(), "destination port");
if (flow.destinationPort < 1 || flow.destinationPort > 65535) {
throw new IllegalArgumentException("invalid destination port [" + destinationPortValue + "]");
throw new IllegalArgumentException("invalid destination port [" + destinationPort.get() + "]");
}
break;
case Icmp:
case IcmpIpV6:
// tolerate missing or invalid ICMP types and codes
flow.icmpType = parseIntFromObjectOrString(s.getIcmpType(), "icmp type");
flow.icmpCode = parseIntFromObjectOrString(s.getIcmpCode(), "icmp code");
flow.icmpType = parseIntFromObjectOrString(icmpType, "icmp type");
flow.icmpCode = parseIntFromObjectOrString(icmpCode, "icmp code");
break;
}

Expand Down Expand Up @@ -350,13 +296,6 @@ public CommunityIdProcessor create(
if (seedInt < 0 || seedInt > 65535) {
throw newConfigurationException(TYPE, processorTag, "seed", "must be a value between 0 and 65535");
}
ThreadLocal<MessageDigest> messageDigest = ThreadLocal.withInitial(() -> {
try {
return MessageDigest.getInstance("SHA-1");
} catch (NoSuchAlgorithmException e) {
throw new IllegalStateException("unable to obtain SHA-1 hasher", e);
}
});

boolean ignoreMissing = readBooleanProperty(TYPE, processorTag, config, "ignore_missing", true);
return new CommunityIdProcessor(
Expand All @@ -371,7 +310,6 @@ public CommunityIdProcessor create(
icmpTypeField,
icmpCodeField,
targetField,
messageDigest,
toUint16(seedInt),
ignoreMissing
);
Expand Down Expand Up @@ -435,7 +373,9 @@ byte[] toBytes() {
return bb.array();
}

String toCommunityId(MessageDigest md, byte[] seed) {
String toCommunityId(byte[] seed) {
MessageDigest md = MESSAGE_DIGEST.get();
md.reset();
md.update(seed);
byte[] encodedBytes = Base64.getEncoder().encode(md.digest(toBytes()));
return "1:" + new String(encodedBytes, StandardCharsets.UTF_8);
Expand Down Expand Up @@ -655,64 +595,4 @@ public static Integer codeEquivalent(int icmpType, boolean isIpV6) {
return isIpV6 ? ICMP_V6_CODE_EQUIVALENTS.get(icmpType) : ICMP_V4_CODE_EQUIVALENTS.get(icmpType);
}
}

interface FlowValueSupplier {
String getSourceIpAddrString();
String getDestIpAddrString();
Object getIanaNumber();
Object getTransport();
Object getSourcePort();
Object getDestinationPort();
Object getIcmpType();
Object getIcmpCode();
}

private class IngestDocumentFlowValueSupplier implements FlowValueSupplier {

private final IngestDocument document;

IngestDocumentFlowValueSupplier(IngestDocument document) {
this.document = document;
}

@Override
public String getSourceIpAddrString() {
return document.getFieldValue(sourceIpField, String.class, ignoreMissing);
}

@Override
public String getDestIpAddrString() {
return document.getFieldValue(destinationIpField, String.class, ignoreMissing);
}

@Override
public Object getIanaNumber() {
return document.getFieldValue(ianaNumberField, Object.class, true);
}

@Override
public Object getTransport() {
return document.getFieldValue(transportField, Object.class, ignoreMissing);
}

@Override
public Object getSourcePort() {
return document.getFieldValue(sourcePortField, Object.class, ignoreMissing);
}

@Override
public Object getDestinationPort() {
return document.getFieldValue(destinationPortField, Object.class, ignoreMissing);
}

@Override
public Object getIcmpType() {
return document.getFieldValue(icmpTypeField, Object.class, true);
}

@Override
public Object getIcmpCode() {
return document.getFieldValue(icmpCodeField, Object.class, true);
}
}
}
Expand Up @@ -12,8 +12,6 @@
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;

import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -38,17 +36,9 @@ public class CommunityIdProcessorTests extends ESTestCase {
// https://github.com/elastic/beats/blob/master/libbeat/processors/communityid/communityid_test.go

private Map<String, Object> event;
private ThreadLocal<MessageDigest> messageDigest;

@Before
public void setup() throws Exception {
messageDigest = ThreadLocal.withInitial(() -> {
try {
return MessageDigest.getInstance("SHA-1");
} catch (NoSuchAlgorithmException e) {
throw new IllegalStateException("unable to obtain SHA-1 hasher", e);
}
});
event = buildEvent();
}

Expand Down Expand Up @@ -323,6 +313,19 @@ public void testIgnoreMissing() throws Exception {
testCommunityIdProcessor(event, 0, null, true);
}

public void testIgnoreMissingIsFalse() throws Exception {
@SuppressWarnings("unchecked")
var source = (Map<String, Object>) event.get("source");
source.remove("ip");

IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> testCommunityIdProcessor(event, 0, null, false)
);

assertThat(e.getMessage(), containsString("field [ip] not present as part of path [source.ip]"));
}

private void testCommunityIdProcessor(Map<String, Object> source, String expectedHash) throws Exception {
testCommunityIdProcessor(source, 0, expectedHash);
}
Expand All @@ -346,7 +349,6 @@ private void testCommunityIdProcessor(Map<String, Object> source, int seed, Stri
DEFAULT_ICMP_TYPE,
DEFAULT_ICMP_CODE,
DEFAULT_TARGET,
messageDigest,
CommunityIdProcessor.toUint16(seed),
ignoreMissing
);
Expand Down

0 comments on commit 9c49328

Please sign in to comment.