From 6fafdcf66df4a1d1df21672f2644fd00953e75f9 Mon Sep 17 00:00:00 2001 From: cstella Date: Tue, 24 May 2016 18:26:37 -0400 Subject: [PATCH 1/4] Making the bolts and adapters slightly more robust so that failures for a single enrichment do not exception out and kill the topology. --- .../enrichment/adapters/geo/GeoAdapter.java | 6 +++ .../enrichment/adapters/jdbc/JdbcAdapter.java | 40 ++++++++++++++++--- .../simplehbase/SimpleHBaseAdapter.java | 14 +++++-- .../threatintel/ThreatIntelAdapter.java | 15 +++++-- .../bolt/GenericEnrichmentBolt.java | 27 +++++++++---- .../adapters/geo/GeoAdapterTest.java | 7 +++- .../threatintel/ThreatIntelAdapterTest.java | 4 +- .../bolt/GenericEnrichmentBoltTest.java | 3 +- 8 files changed, 94 insertions(+), 22 deletions(-) diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoAdapter.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoAdapter.java index 5d12a29f09..dabc36d99c 100644 --- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoAdapter.java +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoAdapter.java @@ -24,6 +24,7 @@ import java.net.InetAddress; import java.sql.ResultSet; +import java.sql.SQLException; public class GeoAdapter extends JdbcAdapter { @@ -34,10 +35,15 @@ public void logAccess(CacheKey value) { } + @SuppressWarnings("unchecked") @Override public JSONObject enrich(CacheKey value) { JSONObject enriched = new JSONObject(); + if(!resetConnectionIfNecessary()) { + _LOG.error("Enrichment failure, cannot maintain a connection to JDBC. Please check connection. In the meantime, I'm not enriching."); + return enriched; + } try { InetAddress addr = InetAddress.getByName(value.getValue()); if (addr.isAnyLocalAddress() || addr.isLoopbackAddress() diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java index 9233059571..af6cda5ede 100644 --- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java @@ -19,6 +19,7 @@ import org.apache.metron.enrichment.bolt.CacheKey; import org.apache.metron.enrichment.interfaces.EnrichmentAdapter; +import org.json.simple.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +39,27 @@ public abstract class JdbcAdapter implements EnrichmentAdapter, private JdbcConfig config; private String host; + protected boolean isConnectionClosed() { + boolean isClosed = statement == null || connection == null; + if(!isClosed) { + try { + isClosed = statement.isClosed() || connection.isClosed(); + } catch (SQLException e) { + _LOG.error("Unable to maintain open JDBC connection: " + e.getMessage(), e); + isClosed = true; + } + } + return isClosed; + } + + protected boolean resetConnectionIfNecessary() { + if(isConnectionClosed()) + { + this.cleanup(); + return this.initializeAdapter(); + } + return true; + } public void setStatement(Statement statement) { this.statement = statement; } @@ -48,6 +70,7 @@ public JdbcAdapter withJdbcConfig(JdbcConfig config) { return this; } + @Override public boolean initializeAdapter() { try { @@ -64,9 +87,7 @@ public boolean initializeAdapter() { ResultSet.CONCUR_READ_ONLY); return true; } catch (Exception e) { - e.printStackTrace(); _LOG.error("[Metron] JDBC connection failed....", e); - return false; } } @@ -74,10 +95,19 @@ public boolean initializeAdapter() { @Override public void cleanup() { try { - if (statement != null) statement.close(); - if (connection != null) connection.close(); + if (statement != null) { + statement.close(); + } } catch (SQLException e) { - e.printStackTrace(); + _LOG.error("[Metron] JDBC statement close failed....", e); + } + try { + if (connection != null) { + connection.close(); + } + } + catch(SQLException e) { + _LOG.error("[Metron] JDBC connection close failed....", e); } } } diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapter.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapter.java index 22629a492b..190ed5a3c8 100644 --- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapter.java +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapter.java @@ -60,13 +60,19 @@ public void logAccess(CacheKey value) { } + public boolean isInitialized() { + return lookup != null && lookup.getTable() != null; + } @Override public JSONObject enrich(CacheKey value) { JSONObject enriched = new JSONObject(); + if(!isInitialized()) { + initializeAdapter(); + } List enrichmentTypes = value.getConfig() .getEnrichment().getFieldToTypeMap() .get(EnrichmentUtils.toTopLevelField(value.getField())); - if(enrichmentTypes != null && value.getValue() != null) { + if(isInitialized() && enrichmentTypes != null && value.getValue() != null) { try { for (LookupKV kv : lookup.get(Iterables.transform(enrichmentTypes @@ -87,6 +93,7 @@ public JSONObject enrich(CacheKey value) { } catch (IOException e) { _LOG.error("Unable to retrieve value: " + e.getMessage(), e); + initializeAdapter(); throw new RuntimeException("Unable to retrieve value: " + e.getMessage(), e); } } @@ -103,7 +110,8 @@ public boolean initializeAdapter() { , new NoopAccessTracker() ); } catch (IOException e) { - throw new RuntimeException("Unable to initialize adapter: " + e.getMessage(), e); + _LOG.error("Unable to initialize adapter: " + e.getMessage(), e); + return false; } return true; } @@ -113,7 +121,7 @@ public void cleanup() { try { lookup.close(); } catch (Exception e) { - throw new RuntimeException("Unable to cleanup access tracker", e); + _LOG.error("Unable to cleanup access tracker", e); } } } diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapter.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapter.java index ee5636bc17..603f93421f 100644 --- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapter.java +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapter.java @@ -65,11 +65,14 @@ public void logAccess(CacheKey value) { @Override public JSONObject enrich(CacheKey value) { + if(!isInitialized()) { + initializeAdapter(); + } JSONObject enriched = new JSONObject(); List enrichmentTypes = value.getConfig() .getThreatIntel().getFieldToTypeMap() .get(EnrichmentUtils.toTopLevelField(value.getField())); - if(enrichmentTypes != null) { + if(isInitialized() && enrichmentTypes != null) { int i = 0; try { for (Boolean isThreat : @@ -89,13 +92,18 @@ public JSONObject enrich(CacheKey value) { } } catch(IOException e) { + _LOG.error("Unable to retrieve value: " + e.getMessage(), e); + initializeAdapter(); throw new RuntimeException("Unable to retrieve value", e); } } - //throw new RuntimeException("Unable to retrieve value " + value); return enriched; } + public boolean isInitialized() { + return lookup != null && lookup.getTable() != null; + } + @Override public boolean initializeAdapter() { PersistentAccessTracker accessTracker; @@ -117,7 +125,8 @@ public boolean initializeAdapter() { ); lookup = new EnrichmentLookup(config.getProvider().getTable(hbaseConfig, hbaseTable), config.getHBaseCF(), accessTracker); } catch (IOException e) { - throw new IllegalStateException("Unable to initialize ThreatIntelAdapter", e); + _LOG.error("Unable to initialize ThreatIntelAdapter", e); + return false; } return true; diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java index 234c7951c1..3a4b67dd7a 100644 --- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java @@ -176,6 +176,7 @@ public void execute(Tuple tuple) { else { throw new RuntimeException("Source type is missing from enrichment fragment: " + rawMessage.toJSONString()); } + boolean error = false; for (Object o : rawMessage.keySet()) { String field = (String) o; String value = (String) rawMessage.get(field); @@ -186,14 +187,23 @@ public void execute(Tuple tuple) { if (value != null && value.length() != 0) { SensorEnrichmentConfig config = configurations.getSensorEnrichmentConfig(sourceType); if(config == null) { - throw new RuntimeException("Unable to find " + config); + LOG.error("Unable to find " + config); + error = true; + continue; } CacheKey cacheKey= new CacheKey(field, value, config); - adapter.logAccess(cacheKey); - enrichedField = cache.getUnchecked(cacheKey); - if (enrichedField == null) - throw new Exception("[Metron] Could not enrich string: " - + value); + try { + adapter.logAccess(cacheKey); + enrichedField = cache.getUnchecked(cacheKey); + if (enrichedField == null) + throw new Exception("[Metron] Could not enrich string: " + + value); + } + catch(Exception e) { + LOG.error(e.getMessage(), e); + error = true; + continue; + } } if (!enrichedField.isEmpty()) { for (Object enrichedKey : enrichedField.keySet()) { @@ -206,7 +216,10 @@ public void execute(Tuple tuple) { } enrichedMessage.put("adapter." + adapter.getClass().getSimpleName().toLowerCase() + ".end.ts", "" + System.currentTimeMillis()); - if (!enrichedMessage.isEmpty()) { + if(error) { + throw new Exception("Unable to enrich " + enrichedMessage + " check logs for specifics."); + } + if (enrichedMessage != null && !enrichedMessage.isEmpty()) { collector.emit(enrichmentType, new Values(key, enrichedMessage)); } } catch (Exception e) { diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/geo/GeoAdapterTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/geo/GeoAdapterTest.java index ec90c49578..8812ad397b 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/geo/GeoAdapterTest.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/geo/GeoAdapterTest.java @@ -83,7 +83,12 @@ public void setup() throws Exception { @Test public void testEnrich() throws Exception { - GeoAdapter geo = new GeoAdapter(); + GeoAdapter geo = new GeoAdapter() { + @Override + public boolean initializeAdapter() { + return true; + } + }; geo.setStatement(statetment); JSONObject actualMessage = geo.enrich(new CacheKey("dummy", ip, null)); Assert.assertNotNull(actualMessage.get("locID")); diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapterTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapterTest.java index 2afeb5bbf1..f420b01775 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapterTest.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapterTest.java @@ -122,7 +122,7 @@ public void testEnrich() throws Exception { Assert.assertEquals(expectedMessage, actualMessage); } - @Test(expected = IllegalStateException.class) + @Test public void testInitializeAdapter() { String cf = "cf"; @@ -145,7 +145,7 @@ public void testInitializeAdapter() { ThreatIntelAdapter tia = new ThreatIntelAdapter(config); tia.initializeAdapter(); - + Assert.assertFalse(tia.isInitialized()); } diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java index a9a16373aa..31168ffb7e 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java @@ -18,6 +18,7 @@ package org.apache.metron.enrichment.bolt; import backtype.storm.tuple.Values; +import com.google.common.collect.ImmutableMap; import org.adrianwalker.multilinestring.Multiline; import org.apache.metron.TestConstants; import org.apache.metron.test.bolt.BaseEnrichmentBoltTest; @@ -177,7 +178,7 @@ public void test() throws IOException { when(tuple.getStringByField("key")).thenReturn(key); when(tuple.getValueByField("message")).thenReturn(originalMessage); genericEnrichmentBolt.execute(tuple); - verify(outputCollector, times(1)).emit(eq(enrichmentType), argThat(new EnrichedMessageMatcher(key, new JSONObject()))); + verify(outputCollector, times(1)).emit(eq(enrichmentType), argThat(new EnrichedMessageMatcher(key, new JSONObject(ImmutableMap.of("source.type", "test"))))); reset(enrichmentAdapter); SensorEnrichmentConfig sensorEnrichmentConfig = SensorEnrichmentConfig. From 865cbc942ceff92e47ca83e664ae6bc9c3f3cd5a Mon Sep 17 00:00:00 2001 From: cstella Date: Tue, 24 May 2016 20:34:25 -0400 Subject: [PATCH 2/4] Continued robustness improvements. --- .../org/apache/metron/enrichment/adapters/geo/GeoAdapter.java | 2 +- .../apache/metron/enrichment/adapters/geo/GeoAdapterTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoAdapter.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoAdapter.java index dabc36d99c..111dbff196 100644 --- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoAdapter.java +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoAdapter.java @@ -51,7 +51,7 @@ public JSONObject enrich(CacheKey value) { || !ipvalidator.isValidInet4Address(value.getValue())) { return new JSONObject(); } - String locidQuery = "select IPTOLOCID(\"" + value + String locidQuery = "select IPTOLOCID(\"" + value.getValue() + "\") as ANS"; ResultSet resultSet = statement.executeQuery(locidQuery); String locid = null; diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/geo/GeoAdapterTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/geo/GeoAdapterTest.java index 8812ad397b..ac5addedc5 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/geo/GeoAdapterTest.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/geo/GeoAdapterTest.java @@ -66,7 +66,7 @@ public void setup() throws Exception { JSONParser jsonParser = new JSONParser(); expectedMessage = (JSONObject) jsonParser.parse(expectedMessageString); MockitoAnnotations.initMocks(this); - when(statetment.executeQuery("select IPTOLOCID(\"CacheKey{field='dummy', value='72.163.4.161'}\") as ANS")).thenReturn(resultSet); + when(statetment.executeQuery("select IPTOLOCID(\"" + ip + "\") as ANS")).thenReturn(resultSet); when(statetment.executeQuery("select * from location where locID = 1")).thenReturn(resultSet1); when(resultSet.next()).thenReturn(Boolean.TRUE, Boolean.FALSE); when(resultSet.getString("ANS")).thenReturn("1"); From 87c46228853d5aa0854133e481bb387bd0cbc230 Mon Sep 17 00:00:00 2001 From: cstella Date: Thu, 26 May 2016 12:18:17 -0400 Subject: [PATCH 3/4] Made JdbcAdapter conform better to K&R style bracing. --- .../apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java index af6cda5ede..6348ac2e89 100644 --- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java @@ -53,8 +53,7 @@ protected boolean isConnectionClosed() { } protected boolean resetConnectionIfNecessary() { - if(isConnectionClosed()) - { + if(isConnectionClosed()) { this.cleanup(); return this.initializeAdapter(); } @@ -80,8 +79,9 @@ public boolean initializeAdapter() { Class.forName(this.config.getClassName()); connection = DriverManager.getConnection(this.config.getJdbcUrl()); connection.setReadOnly(true); - if (!connection.isValid(0)) + if (!connection.isValid(0)) { throw new Exception("Invalid connection string...."); + } statement = connection.createStatement( ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY); From 2833c6967e8d0d879a81fe171ef51ab09230bc7b Mon Sep 17 00:00:00 2001 From: cstella Date: Thu, 26 May 2016 12:20:19 -0400 Subject: [PATCH 4/4] Made JdbcAdapter conform better to K&R style bracing. --- .../apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java index 6348ac2e89..708eb67dfc 100644 --- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java @@ -19,7 +19,6 @@ import org.apache.metron.enrichment.bolt.CacheKey; import org.apache.metron.enrichment.interfaces.EnrichmentAdapter; -import org.json.simple.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,8 +26,7 @@ import java.net.InetAddress; import java.sql.*; -public abstract class JdbcAdapter implements EnrichmentAdapter, - Serializable { +public abstract class JdbcAdapter implements EnrichmentAdapter, Serializable { protected static final Logger _LOG = LoggerFactory .getLogger(JdbcAdapter.class);