From e118cfbee2e044d0e7c9ef92d79f13158883b755 Mon Sep 17 00:00:00 2001 From: Michael Miklavcic Date: Thu, 29 Nov 2018 17:42:25 -0700 Subject: [PATCH] Add adapter timestamp properties to unified enrichment topology parallel enricher --- .../enrichment/parallel/ParallelEnricher.java | 10 +- .../enrichment/utils/EnrichmentUtils.java | 13 +-- .../parallel/ParallelEnricherTest.java | 104 +++++++++++------- 3 files changed, 77 insertions(+), 50 deletions(-) diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/ParallelEnricher.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/ParallelEnricher.java index b10c148746..1de89459ad 100644 --- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/ParallelEnricher.java +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/ParallelEnricher.java @@ -157,6 +157,7 @@ public EnrichmentResult apply( JSONObject message throw new IllegalStateException("Unable to find an adapter for " + task.getKey() + ", possible adapters are: " + Joiner.on(",").join(enrichmentsByType.keySet())); } + message.put("adapter." + adapter.getClass().getSimpleName().toLowerCase() + ".begin.ts", "" + System.currentTimeMillis()); for(JSONObject m : task.getValue()) { /* now for each unit of work (each of these only has one element in them) * the key is the field name and the value is value associated with that field. @@ -171,6 +172,7 @@ public EnrichmentResult apply( JSONObject message String field = (String) o; Object value = m.get(o); if(value == null) { + message.put("adapter." + adapter.getClass().getSimpleName().toLowerCase() + ".end.ts", "" + System.currentTimeMillis()); continue; } CacheKey cacheKey = new CacheKey(field, value, config); @@ -182,7 +184,10 @@ public EnrichmentResult apply( JSONObject message ret = new JSONObject(); } //each enrichment has their own unique prefix to use to adjust the keys for the enriched fields. - return EnrichmentUtils.adjustKeys(new JSONObject(), ret, cacheKey.getField(), prefix); + JSONObject adjustedKeys = EnrichmentUtils + .adjustKeys(new JSONObject(), ret, cacheKey.getField(), prefix); + adjustedKeys.put("adapter." + adapter.getClass().getSimpleName().toLowerCase() + ".end.ts", "" + System.currentTimeMillis()); + return adjustedKeys; } catch (Throwable e) { JSONObject errorMessage = new JSONObject(); errorMessage.putAll(m); @@ -197,11 +202,12 @@ public EnrichmentResult apply( JSONObject message } } if(taskList.isEmpty()) { + message.put(getClass().getSimpleName().toLowerCase() + ".enrich.end.ts", "" + System.currentTimeMillis()); return new EnrichmentResult(message, errors); } EnrichmentResult ret = new EnrichmentResult(all(taskList, message, (left, right) -> join(left, right)).get(), errors); - message.put(getClass().getSimpleName().toLowerCase() + ".enrich.end.ts", "" + System.currentTimeMillis()); + ret.getResult().put(getClass().getSimpleName().toLowerCase() + ".enrich.end.ts", "" + System.currentTimeMillis()); if(perfLog != null) { String key = message.get(Constants.GUID) + ""; perfLog.log("enrich", "key={}, elapsed time to enrich", key); diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java index 63d39c5407..9a36a87cb7 100644 --- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java @@ -21,21 +21,18 @@ import com.google.common.base.Joiner; import com.google.common.base.Splitter; import com.google.common.collect.Iterables; +import java.lang.reflect.InvocationTargetException; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nullable; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.metron.common.configuration.enrichment.EnrichmentConfig; -import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig; +import org.apache.metron.enrichment.converter.EnrichmentKey; import org.apache.metron.enrichment.lookup.EnrichmentLookup; import org.apache.metron.enrichment.lookup.handler.KeyWithContext; import org.apache.metron.hbase.TableProvider; -import org.apache.metron.enrichment.converter.EnrichmentKey; import org.json.simple.JSONObject; -import sun.management.Sensor; - -import javax.annotation.Nullable; -import java.lang.reflect.InvocationTargetException; -import java.util.HashMap; -import java.util.Map; public class EnrichmentUtils { diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/parallel/ParallelEnricherTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/parallel/ParallelEnricherTest.java index d4fcdf4210..a6832d6f6c 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/parallel/ParallelEnricherTest.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/parallel/ParallelEnricherTest.java @@ -65,51 +65,57 @@ public class ParallelEnricherTest { private static Context stellarContext; private static AtomicInteger numAccesses = new AtomicInteger(0); private static Map> enrichmentsByType; - @BeforeClass - public static void setup() { - ConcurrencyContext infrastructure = new ConcurrencyContext(); - infrastructure.initialize(5, 100, 10, null, null, false); - stellarContext = new Context.Builder() - .build(); - StellarFunctions.initialize(stellarContext); - StellarAdapter adapter = new StellarAdapter(){ - @Override - public void logAccess(CacheKey value) { - numAccesses.incrementAndGet(); - } - }.ofType("ENRICHMENT"); - adapter.initializeAdapter(new HashMap<>()); - EnrichmentAdapter dummy = new EnrichmentAdapter() { - @Override - public void logAccess(CacheKey value) { + // Declaring explicit class bc getClass().getSimpleName() returns "" for anon classes + public static class DummyEnrichmentAdapter implements EnrichmentAdapter { + @Override + public void logAccess(CacheKey value) { - } + } - @Override - public JSONObject enrich(CacheKey value) { - return null; - } + @Override + public JSONObject enrich(CacheKey value) { + return null; + } - @Override - public boolean initializeAdapter(Map config) { - return false; - } + @Override + public boolean initializeAdapter(Map config) { + return false; + } - @Override - public void updateAdapter(Map config) { + @Override + public void updateAdapter(Map config) { - } + } - @Override - public void cleanup() { + @Override + public void cleanup() { - } + } - @Override - public String getOutputPrefix(CacheKey value) { - return null; - } - }; + @Override + public String getOutputPrefix(CacheKey value) { + return null; + } + } + + // Declaring explicit class bc getClass().getSimpleName() returns "" for anon classes + public static class AccessLoggingStellarAdapter extends StellarAdapter { + @Override + public void logAccess(CacheKey value) { + numAccesses.incrementAndGet(); + } + } + + @BeforeClass + public static void setup() { + ConcurrencyContext infrastructure = new ConcurrencyContext(); + infrastructure.initialize(5, 100, 10, null, null, false); + stellarContext = new Context.Builder() + .build(); + StellarFunctions.initialize(stellarContext); + StellarAdapter adapter = new AccessLoggingStellarAdapter().ofType("ENRICHMENT"); + adapter.initializeAdapter(new HashMap<>()); + EnrichmentAdapter dummy = new DummyEnrichmentAdapter(); enrichmentsByType = ImmutableMap.of("stellar", adapter, "dummy", dummy); enricher = new ParallelEnricher(enrichmentsByType, infrastructure, false); @@ -139,13 +145,19 @@ public void testGoodConfig() throws Exception { }}; ParallelEnricher.EnrichmentResult result = enricher.apply(message, EnrichmentStrategies.ENRICHMENT, config, null); JSONObject ret = result.getResult(); - Assert.assertEquals("Got the wrong result count: " + ret, 8, ret.size()); + Assert.assertEquals("Got the wrong result count: " + ret, 11, ret.size()); Assert.assertEquals(1, ret.get("map.blah")); Assert.assertEquals("test", ret.get("source.type")); Assert.assertEquals(1, ret.get("one")); Assert.assertEquals(2, ret.get("foo")); Assert.assertEquals("TEST", ret.get("ALL_CAPS")); Assert.assertEquals(0, result.getEnrichmentErrors().size()); + Assert.assertTrue(result.getResult().containsKey("adapter.accessloggingstellaradapter.begin.ts")); + Assert.assertTrue(result.getResult().containsKey("adapter.accessloggingstellaradapter.end.ts")); + Assert.assertTrue(result.getResult().containsKey("parallelenricher.splitter.begin.ts")); + Assert.assertTrue(result.getResult().containsKey("parallelenricher.splitter.end.ts")); + Assert.assertTrue(result.getResult().containsKey("parallelenricher.enrich.begin.ts")); + Assert.assertTrue(result.getResult().containsKey("parallelenricher.enrich.end.ts")); } /** * { @@ -170,7 +182,13 @@ public void testNullEnrichment() throws Exception { }}; ParallelEnricher.EnrichmentResult result = enricher.apply(message, EnrichmentStrategies.ENRICHMENT, config, null); JSONObject ret = result.getResult(); - Assert.assertEquals("Got the wrong result count: " + ret, 4, ret.size()); + Assert.assertEquals("Got the wrong result count: " + ret, 7, ret.size()); + Assert.assertTrue(result.getResult().containsKey("adapter.dummyenrichmentadapter.begin.ts")); + Assert.assertTrue(result.getResult().containsKey("adapter.dummyenrichmentadapter.end.ts")); + Assert.assertTrue(result.getResult().containsKey("parallelenricher.splitter.begin.ts")); + Assert.assertTrue(result.getResult().containsKey("parallelenricher.splitter.end.ts")); + Assert.assertTrue(result.getResult().containsKey("parallelenricher.enrich.begin.ts")); + Assert.assertTrue(result.getResult().containsKey("parallelenricher.enrich.end.ts")); } /** @@ -208,13 +226,19 @@ public void testBadConfig() throws Exception { }}; ParallelEnricher.EnrichmentResult result = enricher.apply(message, EnrichmentStrategies.ENRICHMENT, config, null); JSONObject ret = result.getResult(); - Assert.assertEquals(ret + " is not what I expected", 8, ret.size()); + Assert.assertEquals(ret + " is not what I expected", 11, ret.size()); Assert.assertEquals(1, ret.get("map.blah")); Assert.assertEquals("test", ret.get("source.type")); Assert.assertEquals(1, ret.get("one")); Assert.assertEquals(2, ret.get("foo")); Assert.assertEquals("TEST", ret.get("ALL_CAPS")); Assert.assertEquals(1, result.getEnrichmentErrors().size()); + Assert.assertTrue(result.getResult().containsKey("adapter.accessloggingstellaradapter.begin.ts")); + Assert.assertTrue(result.getResult().containsKey("adapter.accessloggingstellaradapter.end.ts")); + Assert.assertTrue(result.getResult().containsKey("parallelenricher.splitter.begin.ts")); + Assert.assertTrue(result.getResult().containsKey("parallelenricher.splitter.end.ts")); + Assert.assertTrue(result.getResult().containsKey("parallelenricher.enrich.begin.ts")); + Assert.assertTrue(result.getResult().containsKey("parallelenricher.enrich.end.ts")); } /**