From 139e0f8170d741f3153bfd8478b2085d7c036b89 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Sat, 15 Dec 2018 16:41:02 -0500 Subject: [PATCH 01/25] Make the ingest-geoip databases even lazier to load Today we try to load the ingest-geoip databases lazily. Currently they are loaded as soon as any pipeline that uses an ingest-geoip processor is created. This is not lazy enough. For example, we could only load the databases the first time that they are actually used. This would ensure that we load the minimal set of data to support an in-use pipeline (instead of *all* of the data). This can come up in a couple of ways. One is when a subset of the database is used (e.g., the city database versus the country database versus the ASN database). Another is when the plugins are installed on non-ingest nodes (e.g., master-only nodes); we would never use the database in this case yet they are currently being loaded occupying ~60 MB of the heap. This commit makes the ingest-geoip databases as lazy as possible. --- .../ingest/geoip/GeoIpProcessor.java | 91 ++++++++++++------- .../ingest/geoip/GeoIpProcessorTests.java | 44 ++++++--- 2 files changed, 89 insertions(+), 46 deletions(-) diff --git a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java index a0be7557a5a8a..5ac18b6e1e99f 100644 --- a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java +++ b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java @@ -38,6 +38,8 @@ import org.elasticsearch.ingest.Processor; import org.elasticsearch.ingest.geoip.IngestGeoIpPlugin.GeoIpCache; +import java.io.IOException; +import java.io.UncheckedIOException; import java.net.InetAddress; import java.security.AccessController; import java.security.PrivilegedAction; @@ -49,6 +51,8 @@ import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException; import static org.elasticsearch.ingest.ConfigurationUtils.readBooleanProperty; @@ -64,13 +68,13 @@ public final class GeoIpProcessor extends AbstractProcessor { private final String field; private final String targetField; - private final DatabaseReader dbReader; - private final Set properties; + private final Supplier dbReader; + private final Supplier> properties; private final boolean ignoreMissing; private final GeoIpCache cache; - GeoIpProcessor(String tag, String field, DatabaseReader dbReader, String targetField, Set properties, boolean ignoreMissing, + GeoIpProcessor(String tag, String field, Supplier dbReader, String targetField, Supplier> properties, boolean ignoreMissing, GeoIpCache cache) { super(tag); this.field = field; @@ -98,7 +102,7 @@ public IngestDocument execute(IngestDocument ingestDocument) { final InetAddress ipAddress = InetAddresses.forString(ip); Map geoData; - String databaseType = dbReader.getMetadata().getDatabaseType(); + String databaseType = dbReader.get().getMetadata().getDatabaseType(); if (databaseType.endsWith(CITY_DB_SUFFIX)) { try { @@ -119,7 +123,7 @@ public IngestDocument execute(IngestDocument ingestDocument) { geoData = Collections.emptyMap(); } } else { - throw new ElasticsearchParseException("Unsupported database type [" + dbReader.getMetadata().getDatabaseType() + throw new ElasticsearchParseException("Unsupported database type [" + dbReader.get().getMetadata().getDatabaseType() + "]", new IllegalStateException()); } if (geoData.isEmpty() == false) { @@ -142,11 +146,11 @@ String getTargetField() { } DatabaseReader getDbReader() { - return dbReader; + return dbReader.get(); } Set getProperties() { - return properties; + return properties.get(); } private Map retrieveCityGeoData(InetAddress ipAddress) { @@ -154,7 +158,7 @@ private Map retrieveCityGeoData(InetAddress ipAddress) { CityResponse response = AccessController.doPrivileged((PrivilegedAction) () -> cache.putIfAbsent(ipAddress, CityResponse.class, ip -> { try { - return dbReader.city(ip); + return dbReader.get().city(ip); } catch (AddressNotFoundException e) { throw new AddressNotFoundRuntimeException(e); } catch (Exception e) { @@ -169,7 +173,7 @@ private Map retrieveCityGeoData(InetAddress ipAddress) { Subdivision subdivision = response.getMostSpecificSubdivision(); Map geoData = new HashMap<>(); - for (Property property : this.properties) { + for (Property property : this.properties.get()) { switch (property) { case IP: geoData.put("ip", NetworkAddress.format(ipAddress)); @@ -240,7 +244,7 @@ private Map retrieveCountryGeoData(InetAddress ipAddress) { CountryResponse response = AccessController.doPrivileged((PrivilegedAction) () -> cache.putIfAbsent(ipAddress, CountryResponse.class, ip -> { try { - return dbReader.country(ip); + return dbReader.get().country(ip); } catch (AddressNotFoundException e) { throw new AddressNotFoundRuntimeException(e); } catch (Exception e) { @@ -252,7 +256,7 @@ private Map retrieveCountryGeoData(InetAddress ipAddress) { Continent continent = response.getContinent(); Map geoData = new HashMap<>(); - for (Property property : this.properties) { + for (Property property : this.properties.get()) { switch (property) { case IP: geoData.put("ip", NetworkAddress.format(ipAddress)); @@ -285,7 +289,7 @@ private Map retrieveAsnGeoData(InetAddress ipAddress) { AsnResponse response = AccessController.doPrivileged((PrivilegedAction) () -> cache.putIfAbsent(ipAddress, AsnResponse.class, ip -> { try { - return dbReader.asn(ip); + return dbReader.get().asn(ip); } catch (AddressNotFoundException e) { throw new AddressNotFoundRuntimeException(e); } catch (Exception e) { @@ -297,7 +301,7 @@ private Map retrieveAsnGeoData(InetAddress ipAddress) { String organization_name = response.getAutonomousSystemOrganization(); Map geoData = new HashMap<>(); - for (Property property : this.properties) { + for (Property property : this.properties.get()) { switch (property) { case IP: geoData.put("ip", NetworkAddress.format(ipAddress)); @@ -352,33 +356,52 @@ public GeoIpProcessor create(Map registry, String pro "database_file", "database file [" + databaseFile + "] doesn't exist"); } - DatabaseReader databaseReader = lazyLoader.get(); - String databaseType = databaseReader.getMetadata().getDatabaseType(); + final Supplier databaseReader = () -> { + try { + return lazyLoader.get(); + } catch (final IOException e) { + throw new UncheckedIOException(e); + } + }; - final Set properties; + final Supplier> propertiesSupplier; if (propertyNames != null) { - properties = EnumSet.noneOf(Property.class); - for (String fieldName : propertyNames) { - try { - properties.add(Property.parseProperty(databaseType, fieldName)); - } catch (IllegalArgumentException e) { - throw newConfigurationException(TYPE, processorTag, "properties", e.getMessage()); + final AtomicBoolean set = new AtomicBoolean(); + final Set properties = EnumSet.noneOf(Property.class); + propertiesSupplier = () -> { + if (set.get() == false || set.compareAndSet(false, true)) { + for (String fieldName : propertyNames) { + try { + properties.add(Property.parseProperty(databaseReader.get().getMetadata().getDatabaseType(), fieldName)); + } catch (IllegalArgumentException e) { + throw newConfigurationException(TYPE, processorTag, "properties", e.getMessage()); + } + } } - } + return properties; + }; } else { - if (databaseType.endsWith(CITY_DB_SUFFIX)) { - properties = DEFAULT_CITY_PROPERTIES; - } else if (databaseType.endsWith(COUNTRY_DB_SUFFIX)) { - properties = DEFAULT_COUNTRY_PROPERTIES; - } else if (databaseType.endsWith(ASN_DB_SUFFIX)) { - properties = DEFAULT_ASN_PROPERTIES; - } else { - throw newConfigurationException(TYPE, processorTag, "database_file", "Unsupported database type [" - + databaseType + "]"); - } + final AtomicBoolean set = new AtomicBoolean(); + final Set properties = EnumSet.noneOf(Property.class); + propertiesSupplier = () -> { + if (set.get() == false || set.compareAndSet(false, true)) { + final String databaseType = databaseReader.get().getMetadata().getDatabaseType(); + if (databaseType.endsWith(CITY_DB_SUFFIX)) { + properties.addAll(DEFAULT_CITY_PROPERTIES); + } else if (databaseType.endsWith(COUNTRY_DB_SUFFIX)) { + properties.addAll(DEFAULT_COUNTRY_PROPERTIES); + } else if (databaseType.endsWith(ASN_DB_SUFFIX)) { + properties.addAll(DEFAULT_ASN_PROPERTIES); + } else { + throw newConfigurationException(TYPE, processorTag, "database_file", "Unsupported database type [" + + databaseType + "]"); + } + } + return properties; + }; } - return new GeoIpProcessor(processorTag, ipField, databaseReader, targetField, properties, ignoreMissing, cache); + return new GeoIpProcessor(processorTag, ipField, databaseReader, targetField, propertiesSupplier, ignoreMissing, cache); } } diff --git a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java index 4e09662ed4a28..9e25a90c4ff97 100644 --- a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java +++ b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java @@ -25,11 +25,16 @@ import org.elasticsearch.ingest.geoip.IngestGeoIpPlugin.GeoIpCache; import org.elasticsearch.test.ESTestCase; +import java.io.IOException; import java.io.InputStream; +import java.io.UncheckedIOException; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument; import static org.hamcrest.Matchers.containsString; @@ -38,10 +43,25 @@ public class GeoIpProcessorTests extends ESTestCase { + private final Supplier databaseReaderSupplier(final InputStream database) { + final AtomicBoolean set = new AtomicBoolean(); + final AtomicReference databaseReader = new AtomicReference<>(); + return () -> { + if (set.get() == false && set.compareAndSet(false, true)) { + try { + databaseReader.set(new DatabaseReader.Builder(database).build()); + } catch (final IOException e) { + throw new UncheckedIOException(e); + } + } + return databaseReader.get(); + }; + } + public void testCity() throws Exception { InputStream database = getDatabaseFileInputStream("/GeoLite2-City.mmdb"); GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", - new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, + databaseReaderSupplier(database), "target_field", () -> EnumSet.allOf(GeoIpProcessor.Property.class), false, new GeoIpCache(1000)); Map document = new HashMap<>(); @@ -66,7 +86,7 @@ public void testCity() throws Exception { public void testNullValueWithIgnoreMissing() throws Exception { InputStream database = getDatabaseFileInputStream("/GeoLite2-City.mmdb"); GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", - new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), true, + databaseReaderSupplier(database), "target_field", () -> EnumSet.allOf(GeoIpProcessor.Property.class), true, new GeoIpCache(1000)); IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.singletonMap("source_field", null)); @@ -78,7 +98,7 @@ public void testNullValueWithIgnoreMissing() throws Exception { public void testNonExistentWithIgnoreMissing() throws Exception { InputStream database = getDatabaseFileInputStream("/GeoLite2-City.mmdb"); GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", - new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), true, + databaseReaderSupplier(database), "target_field", () -> EnumSet.allOf(GeoIpProcessor.Property.class), true, new GeoIpCache(1000)); IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap()); IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); @@ -89,7 +109,7 @@ public void testNonExistentWithIgnoreMissing() throws Exception { public void testNullWithoutIgnoreMissing() throws Exception { InputStream database = getDatabaseFileInputStream("/GeoLite2-City.mmdb"); GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", - new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, + databaseReaderSupplier(database), "target_field", () -> EnumSet.allOf(GeoIpProcessor.Property.class), false, new GeoIpCache(1000)); IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.singletonMap("source_field", null)); @@ -101,7 +121,7 @@ public void testNullWithoutIgnoreMissing() throws Exception { public void testNonExistentWithoutIgnoreMissing() throws Exception { InputStream database = getDatabaseFileInputStream("/GeoLite2-City.mmdb"); GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", - new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, + databaseReaderSupplier(database), "target_field", () -> EnumSet.allOf(GeoIpProcessor.Property.class), false, new GeoIpCache(1000)); IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap()); IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); @@ -112,7 +132,7 @@ public void testNonExistentWithoutIgnoreMissing() throws Exception { public void testCity_withIpV6() throws Exception { InputStream database = getDatabaseFileInputStream("/GeoLite2-City.mmdb"); GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", - new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, + databaseReaderSupplier(database), "target_field", () -> EnumSet.allOf(GeoIpProcessor.Property.class), false, new GeoIpCache(1000)); String address = "2602:306:33d3:8000::3257:9652"; @@ -142,7 +162,7 @@ public void testCity_withIpV6() throws Exception { public void testCityWithMissingLocation() throws Exception { InputStream database = getDatabaseFileInputStream("/GeoLite2-City.mmdb"); GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", - new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, + databaseReaderSupplier(database), "target_field", () -> EnumSet.allOf(GeoIpProcessor.Property.class), false, new GeoIpCache(1000)); Map document = new HashMap<>(); @@ -160,7 +180,7 @@ public void testCityWithMissingLocation() throws Exception { public void testCountry() throws Exception { InputStream database = getDatabaseFileInputStream("/GeoLite2-Country.mmdb"); GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", - new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, + databaseReaderSupplier(database), "target_field", () -> EnumSet.allOf(GeoIpProcessor.Property.class), false, new GeoIpCache(1000)); Map document = new HashMap<>(); @@ -181,7 +201,7 @@ public void testCountry() throws Exception { public void testCountryWithMissingLocation() throws Exception { InputStream database = getDatabaseFileInputStream("/GeoLite2-Country.mmdb"); GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", - new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, + databaseReaderSupplier(database), "target_field", () -> EnumSet.allOf(GeoIpProcessor.Property.class), false, new GeoIpCache(1000)); Map document = new HashMap<>(); @@ -200,7 +220,7 @@ public void testAsn() throws Exception { String ip = "82.171.64.0"; InputStream database = getDatabaseFileInputStream("/GeoLite2-ASN.mmdb"); GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", - new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, + databaseReaderSupplier(database), "target_field", () -> EnumSet.allOf(GeoIpProcessor.Property.class), false, new GeoIpCache(1000)); Map document = new HashMap<>(); @@ -220,7 +240,7 @@ public void testAsn() throws Exception { public void testAddressIsNotInTheDatabase() throws Exception { InputStream database = getDatabaseFileInputStream("/GeoLite2-City.mmdb"); GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", - new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, + databaseReaderSupplier(database), "target_field", () -> EnumSet.allOf(GeoIpProcessor.Property.class), false, new GeoIpCache(1000)); Map document = new HashMap<>(); @@ -234,7 +254,7 @@ public void testAddressIsNotInTheDatabase() throws Exception { public void testInvalid() throws Exception { InputStream database = getDatabaseFileInputStream("/GeoLite2-City.mmdb"); GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", - new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, + databaseReaderSupplier(database), "target_field", () -> EnumSet.allOf(GeoIpProcessor.Property.class), false, new GeoIpCache(1000)); Map document = new HashMap<>(); From 43d51760e124a2d58a41f2ce7cd3ecf05567cf21 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Sat, 15 Dec 2018 16:52:06 -0500 Subject: [PATCH 02/25] Fix silliness --- .../ingest/geoip/GeoIpProcessor.java | 26 +++++++++++++++---- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java index 5ac18b6e1e99f..4003ada7b44aa 100644 --- a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java +++ b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java @@ -73,9 +73,25 @@ public final class GeoIpProcessor extends AbstractProcessor { private final boolean ignoreMissing; private final GeoIpCache cache; - - GeoIpProcessor(String tag, String field, Supplier dbReader, String targetField, Supplier> properties, boolean ignoreMissing, - GeoIpCache cache) { + /** + * Construct a geo-IP processor. + * + * @param tag the processor tag + * @param field the source field to geo-IP map + * @param dbReader a supplier of a geo-IP database reader; ideally this is lazily-loaded once on first use + * @param targetField the target field + * @param properties the properties; ideally this is lazily-loaded once on first use + * @param ignoreMissing true if documents with a missing value for the field should be ignored + * @param cache a geo-IP cache + */ + GeoIpProcessor( + final String tag, + final String field, + final Supplier dbReader, + final String targetField, + final Supplier> properties, + final boolean ignoreMissing, + final GeoIpCache cache) { super(tag); this.field = field; this.targetField = targetField; @@ -369,7 +385,7 @@ public GeoIpProcessor create(Map registry, String pro final AtomicBoolean set = new AtomicBoolean(); final Set properties = EnumSet.noneOf(Property.class); propertiesSupplier = () -> { - if (set.get() == false || set.compareAndSet(false, true)) { + if (set.compareAndSet(false, true)) { for (String fieldName : propertyNames) { try { properties.add(Property.parseProperty(databaseReader.get().getMetadata().getDatabaseType(), fieldName)); @@ -384,7 +400,7 @@ public GeoIpProcessor create(Map registry, String pro final AtomicBoolean set = new AtomicBoolean(); final Set properties = EnumSet.noneOf(Property.class); propertiesSupplier = () -> { - if (set.get() == false || set.compareAndSet(false, true)) { + if (set.compareAndSet(false, true)) { final String databaseType = databaseReader.get().getMetadata().getDatabaseType(); if (databaseType.endsWith(CITY_DB_SUFFIX)) { properties.addAll(DEFAULT_CITY_PROPERTIES); From f762a08c2ed12191567f0a332284aa295635bea5 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Sat, 15 Dec 2018 16:52:17 -0500 Subject: [PATCH 03/25] Fix silliness --- .../org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java index 9e25a90c4ff97..9080f99b8da24 100644 --- a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java +++ b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java @@ -47,7 +47,7 @@ private final Supplier databaseReaderSupplier(final InputStream final AtomicBoolean set = new AtomicBoolean(); final AtomicReference databaseReader = new AtomicReference<>(); return () -> { - if (set.get() == false && set.compareAndSet(false, true)) { + if (set.compareAndSet(false, true)) { try { databaseReader.set(new DatabaseReader.Builder(database).build()); } catch (final IOException e) { From d9477a4f156cf46922cd3df0a773dfc26fbf3834 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Sat, 15 Dec 2018 17:04:12 -0500 Subject: [PATCH 04/25] Checkstyle --- .../org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java index 9080f99b8da24..6f564606a6841 100644 --- a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java +++ b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java @@ -43,7 +43,7 @@ public class GeoIpProcessorTests extends ESTestCase { - private final Supplier databaseReaderSupplier(final InputStream database) { + private Supplier databaseReaderSupplier(final InputStream database) { final AtomicBoolean set = new AtomicBoolean(); final AtomicReference databaseReader = new AtomicReference<>(); return () -> { From 316912188eaa469373cc90442393c374747d61c9 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Sun, 16 Dec 2018 07:59:49 -0500 Subject: [PATCH 05/25] wip --- .../ingest/geoip/GeoIpProcessor.java | 54 +++++------- .../ingest/geoip/GeoIpProcessorTests.java | 83 ++++++------------- 2 files changed, 49 insertions(+), 88 deletions(-) diff --git a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java index 4003ada7b44aa..34857696a754e 100644 --- a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java +++ b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java @@ -31,6 +31,7 @@ import com.maxmind.geoip2.record.Subdivision; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.SpecialPermission; +import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.common.network.InetAddresses; import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.ingest.AbstractProcessor; @@ -39,7 +40,6 @@ import org.elasticsearch.ingest.geoip.IngestGeoIpPlugin.GeoIpCache; import java.io.IOException; -import java.io.UncheckedIOException; import java.net.InetAddress; import java.security.AccessController; import java.security.PrivilegedAction; @@ -52,7 +52,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Supplier; +import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException; import static org.elasticsearch.ingest.ConfigurationUtils.readBooleanProperty; @@ -68,8 +68,8 @@ public final class GeoIpProcessor extends AbstractProcessor { private final String field; private final String targetField; - private final Supplier dbReader; - private final Supplier> properties; + private final DatabaseReaderLazyLoader dbReader; + private final CheckedSupplier, IOException> properties; private final boolean ignoreMissing; private final GeoIpCache cache; @@ -87,9 +87,9 @@ public final class GeoIpProcessor extends AbstractProcessor { GeoIpProcessor( final String tag, final String field, - final Supplier dbReader, + final DatabaseReaderLazyLoader dbReader, final String targetField, - final Supplier> properties, + final CheckedSupplier, IOException> properties, final boolean ignoreMissing, final GeoIpCache cache) { super(tag); @@ -106,7 +106,7 @@ boolean isIgnoreMissing() { } @Override - public IngestDocument execute(IngestDocument ingestDocument) { + public IngestDocument execute(IngestDocument ingestDocument) throws IOException { String ip = ingestDocument.getFieldValue(field, String.class, ignoreMissing); if (ip == null && ignoreMissing) { @@ -161,15 +161,15 @@ String getTargetField() { return targetField; } - DatabaseReader getDbReader() { + DatabaseReader getDbReader() throws IOException { return dbReader.get(); } - Set getProperties() { + Set getProperties() throws IOException { return properties.get(); } - private Map retrieveCityGeoData(InetAddress ipAddress) { + private Map retrieveCityGeoData(InetAddress ipAddress) throws IOException { SpecialPermission.check(); CityResponse response = AccessController.doPrivileged((PrivilegedAction) () -> cache.putIfAbsent(ipAddress, CityResponse.class, ip -> { @@ -255,7 +255,7 @@ private Map retrieveCityGeoData(InetAddress ipAddress) { return geoData; } - private Map retrieveCountryGeoData(InetAddress ipAddress) { + private Map retrieveCountryGeoData(InetAddress ipAddress) throws IOException { SpecialPermission.check(); CountryResponse response = AccessController.doPrivileged((PrivilegedAction) () -> cache.putIfAbsent(ipAddress, CountryResponse.class, ip -> { @@ -300,7 +300,7 @@ private Map retrieveCountryGeoData(InetAddress ipAddress) { return geoData; } - private Map retrieveAsnGeoData(InetAddress ipAddress) { + private Map retrieveAsnGeoData(InetAddress ipAddress) throws IOException { SpecialPermission.check(); AsnResponse response = AccessController.doPrivileged((PrivilegedAction) () -> cache.putIfAbsent(ipAddress, AsnResponse.class, ip -> { @@ -358,8 +358,8 @@ public Factory(Map databaseReaders, GeoIpCache } @Override - public GeoIpProcessor create(Map registry, String processorTag, - Map config) throws Exception { + public GeoIpProcessor create( + final Map registry, final String processorTag, final Map config) { String ipField = readStringProperty(TYPE, processorTag, config, "field"); String targetField = readStringProperty(TYPE, processorTag, config, "target_field", "geoip"); String databaseFile = readStringProperty(TYPE, processorTag, config, "database_file", "GeoLite2-City.mmdb"); @@ -372,15 +372,7 @@ public GeoIpProcessor create(Map registry, String pro "database_file", "database file [" + databaseFile + "] doesn't exist"); } - final Supplier databaseReader = () -> { - try { - return lazyLoader.get(); - } catch (final IOException e) { - throw new UncheckedIOException(e); - } - }; - - final Supplier> propertiesSupplier; + final CheckedSupplier, IOException> propertiesSupplier; if (propertyNames != null) { final AtomicBoolean set = new AtomicBoolean(); final Set properties = EnumSet.noneOf(Property.class); @@ -388,7 +380,7 @@ public GeoIpProcessor create(Map registry, String pro if (set.compareAndSet(false, true)) { for (String fieldName : propertyNames) { try { - properties.add(Property.parseProperty(databaseReader.get().getMetadata().getDatabaseType(), fieldName)); + properties.add(Property.parseProperty(lazyLoader.get().getMetadata().getDatabaseType(), fieldName)); } catch (IllegalArgumentException e) { throw newConfigurationException(TYPE, processorTag, "properties", e.getMessage()); } @@ -398,26 +390,26 @@ public GeoIpProcessor create(Map registry, String pro }; } else { final AtomicBoolean set = new AtomicBoolean(); - final Set properties = EnumSet.noneOf(Property.class); + final AtomicReference> properties = new AtomicReference<>(); propertiesSupplier = () -> { if (set.compareAndSet(false, true)) { - final String databaseType = databaseReader.get().getMetadata().getDatabaseType(); + final String databaseType = lazyLoader.get().getMetadata().getDatabaseType(); if (databaseType.endsWith(CITY_DB_SUFFIX)) { - properties.addAll(DEFAULT_CITY_PROPERTIES); + properties.set(DEFAULT_CITY_PROPERTIES); } else if (databaseType.endsWith(COUNTRY_DB_SUFFIX)) { - properties.addAll(DEFAULT_COUNTRY_PROPERTIES); + properties.set(DEFAULT_COUNTRY_PROPERTIES); } else if (databaseType.endsWith(ASN_DB_SUFFIX)) { - properties.addAll(DEFAULT_ASN_PROPERTIES); + properties.set(DEFAULT_ASN_PROPERTIES); } else { throw newConfigurationException(TYPE, processorTag, "database_file", "Unsupported database type [" + databaseType + "]"); } } - return properties; + return properties.get(); }; } - return new GeoIpProcessor(processorTag, ipField, databaseReader, targetField, propertiesSupplier, ignoreMissing, cache); + return new GeoIpProcessor(processorTag, ipField, lazyLoader, targetField, propertiesSupplier, ignoreMissing, cache); } } diff --git a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java index 6f564606a6841..5584673270d65 100644 --- a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java +++ b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java @@ -25,16 +25,11 @@ import org.elasticsearch.ingest.geoip.IngestGeoIpPlugin.GeoIpCache; import org.elasticsearch.test.ESTestCase; -import java.io.IOException; import java.io.InputStream; -import java.io.UncheckedIOException; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Supplier; import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument; import static org.hamcrest.Matchers.containsString; @@ -43,26 +38,10 @@ public class GeoIpProcessorTests extends ESTestCase { - private Supplier databaseReaderSupplier(final InputStream database) { - final AtomicBoolean set = new AtomicBoolean(); - final AtomicReference databaseReader = new AtomicReference<>(); - return () -> { - if (set.compareAndSet(false, true)) { - try { - databaseReader.set(new DatabaseReader.Builder(database).build()); - } catch (final IOException e) { - throw new UncheckedIOException(e); - } - } - return databaseReader.get(); - }; - } - public void testCity() throws Exception { - InputStream database = getDatabaseFileInputStream("/GeoLite2-City.mmdb"); GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", - databaseReaderSupplier(database), "target_field", () -> EnumSet.allOf(GeoIpProcessor.Property.class), false, - new GeoIpCache(1000)); + loader("/GeoLite2-City.mmdb"), "target_field", () -> EnumSet.allOf(GeoIpProcessor.Property.class), false, + new GeoIpCache(1000)); Map document = new HashMap<>(); document.put("source_field", "8.8.8.8"); @@ -84,10 +63,9 @@ public void testCity() throws Exception { } public void testNullValueWithIgnoreMissing() throws Exception { - InputStream database = getDatabaseFileInputStream("/GeoLite2-City.mmdb"); GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", - databaseReaderSupplier(database), "target_field", () -> EnumSet.allOf(GeoIpProcessor.Property.class), true, - new GeoIpCache(1000)); + loader("/GeoLite2-City.mmdb"), "target_field", () -> EnumSet.allOf(GeoIpProcessor.Property.class), true, + new GeoIpCache(1000)); IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.singletonMap("source_field", null)); IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); @@ -96,10 +74,9 @@ public void testNullValueWithIgnoreMissing() throws Exception { } public void testNonExistentWithIgnoreMissing() throws Exception { - InputStream database = getDatabaseFileInputStream("/GeoLite2-City.mmdb"); GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", - databaseReaderSupplier(database), "target_field", () -> EnumSet.allOf(GeoIpProcessor.Property.class), true, - new GeoIpCache(1000)); + loader("/GeoLite2-City.mmdb"), "target_field", () -> EnumSet.allOf(GeoIpProcessor.Property.class), true, + new GeoIpCache(1000)); IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap()); IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); processor.execute(ingestDocument); @@ -107,10 +84,9 @@ public void testNonExistentWithIgnoreMissing() throws Exception { } public void testNullWithoutIgnoreMissing() throws Exception { - InputStream database = getDatabaseFileInputStream("/GeoLite2-City.mmdb"); GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", - databaseReaderSupplier(database), "target_field", () -> EnumSet.allOf(GeoIpProcessor.Property.class), false, - new GeoIpCache(1000)); + loader("/GeoLite2-City.mmdb"), "target_field", () -> EnumSet.allOf(GeoIpProcessor.Property.class), false, + new GeoIpCache(1000)); IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.singletonMap("source_field", null)); IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); @@ -119,9 +95,8 @@ public void testNullWithoutIgnoreMissing() throws Exception { } public void testNonExistentWithoutIgnoreMissing() throws Exception { - InputStream database = getDatabaseFileInputStream("/GeoLite2-City.mmdb"); GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", - databaseReaderSupplier(database), "target_field", () -> EnumSet.allOf(GeoIpProcessor.Property.class), false, + loader("/GeoLite2-City.mmdb"), "target_field", () -> EnumSet.allOf(GeoIpProcessor.Property.class), false, new GeoIpCache(1000)); IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap()); IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); @@ -130,10 +105,9 @@ public void testNonExistentWithoutIgnoreMissing() throws Exception { } public void testCity_withIpV6() throws Exception { - InputStream database = getDatabaseFileInputStream("/GeoLite2-City.mmdb"); GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", - databaseReaderSupplier(database), "target_field", () -> EnumSet.allOf(GeoIpProcessor.Property.class), false, - new GeoIpCache(1000)); + loader("/GeoLite2-City.mmdb"), "target_field", () -> EnumSet.allOf(GeoIpProcessor.Property.class), false, + new GeoIpCache(1000)); String address = "2602:306:33d3:8000::3257:9652"; Map document = new HashMap<>(); @@ -160,10 +134,9 @@ public void testCity_withIpV6() throws Exception { } public void testCityWithMissingLocation() throws Exception { - InputStream database = getDatabaseFileInputStream("/GeoLite2-City.mmdb"); GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", - databaseReaderSupplier(database), "target_field", () -> EnumSet.allOf(GeoIpProcessor.Property.class), false, - new GeoIpCache(1000)); + loader("/GeoLite2-City.mmdb"), "target_field", () -> EnumSet.allOf(GeoIpProcessor.Property.class), false, + new GeoIpCache(1000)); Map document = new HashMap<>(); document.put("source_field", "80.231.5.0"); @@ -178,10 +151,9 @@ public void testCityWithMissingLocation() throws Exception { } public void testCountry() throws Exception { - InputStream database = getDatabaseFileInputStream("/GeoLite2-Country.mmdb"); GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", - databaseReaderSupplier(database), "target_field", () -> EnumSet.allOf(GeoIpProcessor.Property.class), false, - new GeoIpCache(1000)); + loader("/GeoLite2-Country.mmdb"), "target_field", () -> EnumSet.allOf(GeoIpProcessor.Property.class), false, + new GeoIpCache(1000)); Map document = new HashMap<>(); document.put("source_field", "82.170.213.79"); @@ -199,10 +171,9 @@ public void testCountry() throws Exception { } public void testCountryWithMissingLocation() throws Exception { - InputStream database = getDatabaseFileInputStream("/GeoLite2-Country.mmdb"); GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", - databaseReaderSupplier(database), "target_field", () -> EnumSet.allOf(GeoIpProcessor.Property.class), false, - new GeoIpCache(1000)); + loader("/GeoLite2-Country.mmdb"), "target_field", () -> EnumSet.allOf(GeoIpProcessor.Property.class), false, + new GeoIpCache(1000)); Map document = new HashMap<>(); document.put("source_field", "80.231.5.0"); @@ -218,10 +189,9 @@ public void testCountryWithMissingLocation() throws Exception { public void testAsn() throws Exception { String ip = "82.171.64.0"; - InputStream database = getDatabaseFileInputStream("/GeoLite2-ASN.mmdb"); GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", - databaseReaderSupplier(database), "target_field", () -> EnumSet.allOf(GeoIpProcessor.Property.class), false, - new GeoIpCache(1000)); + loader("/GeoLite2-ASN.mmdb"), "target_field", () -> EnumSet.allOf(GeoIpProcessor.Property.class), false, + new GeoIpCache(1000)); Map document = new HashMap<>(); document.put("source_field", ip); @@ -238,10 +208,9 @@ public void testAsn() throws Exception { } public void testAddressIsNotInTheDatabase() throws Exception { - InputStream database = getDatabaseFileInputStream("/GeoLite2-City.mmdb"); GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", - databaseReaderSupplier(database), "target_field", () -> EnumSet.allOf(GeoIpProcessor.Property.class), false, - new GeoIpCache(1000)); + loader("/GeoLite2-City.mmdb"), "target_field", () -> EnumSet.allOf(GeoIpProcessor.Property.class), false, + new GeoIpCache(1000)); Map document = new HashMap<>(); document.put("source_field", "127.0.0.1"); @@ -252,10 +221,9 @@ public void testAddressIsNotInTheDatabase() throws Exception { /** Don't silently do DNS lookups or anything trappy on bogus data */ public void testInvalid() throws Exception { - InputStream database = getDatabaseFileInputStream("/GeoLite2-City.mmdb"); GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", - databaseReaderSupplier(database), "target_field", () -> EnumSet.allOf(GeoIpProcessor.Property.class), false, - new GeoIpCache(1000)); + loader("/GeoLite2-City.mmdb"), "target_field", () -> EnumSet.allOf(GeoIpProcessor.Property.class), false, + new GeoIpCache(1000)); Map document = new HashMap<>(); document.put("source_field", "www.google.com"); @@ -264,8 +232,9 @@ public void testInvalid() throws Exception { assertThat(e.getMessage(), containsString("not an IP string literal")); } - private static InputStream getDatabaseFileInputStream(String path) { - return GeoIpProcessor.class.getResourceAsStream(path); + private DatabaseReaderLazyLoader loader(final String path) { + final InputStream is = GeoIpProcessor.class.getResourceAsStream(path); + return new DatabaseReaderLazyLoader(path, () -> new DatabaseReader.Builder(is).build()); } } From 457004fe478866fbdc7b5a48d3ef065bac2f0d70 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 17 Dec 2018 14:44:24 -0500 Subject: [PATCH 06/25] WIP --- .../geoip/DatabaseReaderLazyLoader.java | 67 +++++++++++++++-- .../ingest/geoip/GeoIpProcessor.java | 73 ++++++++----------- .../ingest/geoip/IngestGeoIpPlugin.java | 21 +++--- .../geoip/GeoIpProcessorFactoryTests.java | 29 ++++++-- .../ingest/geoip/GeoIpProcessorTests.java | 33 +++++---- 5 files changed, 143 insertions(+), 80 deletions(-) diff --git a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java index a185e038582e0..f1ac29a4626f4 100644 --- a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java +++ b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.elasticsearch.ingest.geoip; import com.maxmind.geoip2.DatabaseReader; @@ -27,30 +28,79 @@ import java.io.Closeable; import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Objects; /** * Facilitates lazy loading of the database reader, so that when the geoip plugin is installed, but not used, * no memory is being wasted on the database reader. */ -final class DatabaseReaderLazyLoader implements Closeable { +class DatabaseReaderLazyLoader implements Closeable { private static final Logger LOGGER = LogManager.getLogger(DatabaseReaderLazyLoader.class); - private final String databaseFileName; + private final Path databasePath; private final CheckedSupplier loader; - // package protected for testing only: final SetOnce databaseReader; - DatabaseReaderLazyLoader(String databaseFileName, CheckedSupplier loader) { - this.databaseFileName = databaseFileName; - this.loader = loader; + DatabaseReaderLazyLoader(final Path databasePath, final CheckedSupplier loader) { + this.databasePath = Objects.requireNonNull(databasePath); + this.loader = Objects.requireNonNull(loader); this.databaseReader = new SetOnce<>(); } + final String getDatabaseType() throws IOException { + final long fileSize = Files.size(databasePath); + final int[] DATABASE_TYPE_MARKER = {'d', 'a', 't', 'a', 'b', 'a', 's', 'e', '_', 't', 'y', 'p', 'e'}; + try (InputStream in = databaseInputStream()) { + // read last 512 bytes + in.skip(fileSize - 512); + byte[] tail = new byte[512]; + in.read(tail); + + // find the database_type header + int metadataOffset = -1; + int markerOffset = 0; + for (int i = 0; i < tail.length; i++) { + byte b = tail[i]; + + if (b == DATABASE_TYPE_MARKER[markerOffset]) { + markerOffset++; + } else { + markerOffset = 0; + } + if (markerOffset == DATABASE_TYPE_MARKER.length) { + metadataOffset = i + 1; + break; + } + } + + // read the database type + final int offsetByte = tail[metadataOffset] & 0xFF; + final int type = offsetByte >>> 5; + if (type != 2) { + throw new RuntimeException("type must be UTF8_STRING"); + } + int size = offsetByte & 0x1f; + return new String(tail, metadataOffset + 1, size, StandardCharsets.UTF_8); + } + } + + InputStream databaseInputStream() throws IOException { + return Files.newInputStream(databasePath); + } + synchronized DatabaseReader get() throws IOException { if (databaseReader.get() == null) { - databaseReader.set(loader.get()); - LOGGER.debug("Loaded [{}] geoip database", databaseFileName); + synchronized (databaseReader) { + if (databaseReader.get() == null) { + databaseReader.set(loader.get()); + LOGGER.debug("Loaded [{}] geo-IP database", databasePath); + } + } } return databaseReader.get(); } @@ -59,4 +109,5 @@ synchronized DatabaseReader get() throws IOException { public synchronized void close() throws IOException { IOUtils.close(databaseReader.get()); } + } diff --git a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java index 34857696a754e..5fc08c70073fa 100644 --- a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java +++ b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java @@ -31,7 +31,6 @@ import com.maxmind.geoip2.record.Subdivision; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.SpecialPermission; -import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.common.network.InetAddresses; import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.ingest.AbstractProcessor; @@ -51,8 +50,6 @@ import java.util.Locale; import java.util.Map; import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException; import static org.elasticsearch.ingest.ConfigurationUtils.readBooleanProperty; @@ -69,7 +66,7 @@ public final class GeoIpProcessor extends AbstractProcessor { private final String field; private final String targetField; private final DatabaseReaderLazyLoader dbReader; - private final CheckedSupplier, IOException> properties; + private final Set properties; private final boolean ignoreMissing; private final GeoIpCache cache; @@ -89,7 +86,7 @@ public final class GeoIpProcessor extends AbstractProcessor { final String field, final DatabaseReaderLazyLoader dbReader, final String targetField, - final CheckedSupplier, IOException> properties, + final Set properties, final boolean ignoreMissing, final GeoIpCache cache) { super(tag); @@ -166,7 +163,7 @@ DatabaseReader getDbReader() throws IOException { } Set getProperties() throws IOException { - return properties.get(); + return properties; } private Map retrieveCityGeoData(InetAddress ipAddress) throws IOException { @@ -189,7 +186,7 @@ private Map retrieveCityGeoData(InetAddress ipAddress) throws IO Subdivision subdivision = response.getMostSpecificSubdivision(); Map geoData = new HashMap<>(); - for (Property property : this.properties.get()) { + for (Property property : this.properties) { switch (property) { case IP: geoData.put("ip", NetworkAddress.format(ipAddress)); @@ -272,7 +269,7 @@ private Map retrieveCountryGeoData(InetAddress ipAddress) throws Continent continent = response.getContinent(); Map geoData = new HashMap<>(); - for (Property property : this.properties.get()) { + for (Property property : this.properties) { switch (property) { case IP: geoData.put("ip", NetworkAddress.format(ipAddress)); @@ -317,7 +314,7 @@ private Map retrieveAsnGeoData(InetAddress ipAddress) throws IOE String organization_name = response.getAutonomousSystemOrganization(); Map geoData = new HashMap<>(); - for (Property property : this.properties.get()) { + for (Property property : this.properties) { switch (property) { case IP: geoData.put("ip", NetworkAddress.format(ipAddress)); @@ -359,7 +356,9 @@ public Factory(Map databaseReaders, GeoIpCache @Override public GeoIpProcessor create( - final Map registry, final String processorTag, final Map config) { + final Map registry, + final String processorTag, + final Map config) throws IOException { String ipField = readStringProperty(TYPE, processorTag, config, "field"); String targetField = readStringProperty(TYPE, processorTag, config, "target_field", "geoip"); String databaseFile = readStringProperty(TYPE, processorTag, config, "database_file", "GeoLite2-City.mmdb"); @@ -372,44 +371,32 @@ public GeoIpProcessor create( "database_file", "database file [" + databaseFile + "] doesn't exist"); } - final CheckedSupplier, IOException> propertiesSupplier; + final String databaseType = lazyLoader.getDatabaseType(); + + final Set properties; if (propertyNames != null) { - final AtomicBoolean set = new AtomicBoolean(); - final Set properties = EnumSet.noneOf(Property.class); - propertiesSupplier = () -> { - if (set.compareAndSet(false, true)) { - for (String fieldName : propertyNames) { - try { - properties.add(Property.parseProperty(lazyLoader.get().getMetadata().getDatabaseType(), fieldName)); - } catch (IllegalArgumentException e) { - throw newConfigurationException(TYPE, processorTag, "properties", e.getMessage()); - } - } + properties = EnumSet.noneOf(Property.class); + for (String fieldName : propertyNames) { + try { + properties.add(Property.parseProperty(databaseType, fieldName)); + } catch (IllegalArgumentException e) { + throw newConfigurationException(TYPE, processorTag, "properties", e.getMessage()); } - return properties; - }; + } } else { - final AtomicBoolean set = new AtomicBoolean(); - final AtomicReference> properties = new AtomicReference<>(); - propertiesSupplier = () -> { - if (set.compareAndSet(false, true)) { - final String databaseType = lazyLoader.get().getMetadata().getDatabaseType(); - if (databaseType.endsWith(CITY_DB_SUFFIX)) { - properties.set(DEFAULT_CITY_PROPERTIES); - } else if (databaseType.endsWith(COUNTRY_DB_SUFFIX)) { - properties.set(DEFAULT_COUNTRY_PROPERTIES); - } else if (databaseType.endsWith(ASN_DB_SUFFIX)) { - properties.set(DEFAULT_ASN_PROPERTIES); - } else { - throw newConfigurationException(TYPE, processorTag, "database_file", "Unsupported database type [" - + databaseType + "]"); - } - } - return properties.get(); - }; + if (databaseType.endsWith(CITY_DB_SUFFIX)) { + properties = DEFAULT_CITY_PROPERTIES; + } else if (databaseType.endsWith(COUNTRY_DB_SUFFIX)) { + properties = DEFAULT_COUNTRY_PROPERTIES; + } else if (databaseType.endsWith(ASN_DB_SUFFIX)) { + properties = DEFAULT_ASN_PROPERTIES; + } else { + throw newConfigurationException(TYPE, processorTag, "database_file", "Unsupported database type [" + + databaseType + "]"); + } } - return new GeoIpProcessor(processorTag, ipField, lazyLoader, targetField, propertiesSupplier, ignoreMissing, cache); + return new GeoIpProcessor(processorTag, ipField, lazyLoader, targetField, properties, ignoreMissing, cache); } } diff --git a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java index 95e20f340b5ae..a45b7d5e96657 100644 --- a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java +++ b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java @@ -90,16 +90,17 @@ static Map loadDatabaseReaders(Path geoIpConfi Path databasePath = iterator.next(); if (Files.isRegularFile(databasePath) && pathMatcher.matches(databasePath)) { String databaseFileName = databasePath.getFileName().toString(); - DatabaseReaderLazyLoader holder = new DatabaseReaderLazyLoader(databaseFileName, - () -> { - DatabaseReader.Builder builder = createDatabaseBuilder(databasePath).withCache(NoCache.getInstance()); - if (loadDatabaseOnHeap) { - builder.fileMode(Reader.FileMode.MEMORY); - } else { - builder.fileMode(Reader.FileMode.MEMORY_MAPPED); - } - return builder.build(); - }); + DatabaseReaderLazyLoader holder = new DatabaseReaderLazyLoader( + databasePath, + () -> { + DatabaseReader.Builder builder = createDatabaseBuilder(databasePath).withCache(NoCache.getInstance()); + if (loadDatabaseOnHeap) { + builder.fileMode(Reader.FileMode.MEMORY); + } else { + builder.fileMode(Reader.FileMode.MEMORY_MAPPED); + } + return builder.build(); + }); databaseReaders.put(databaseFileName, holder); } } diff --git a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java index 316cfbc152c57..e76243222cf2d 100644 --- a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java +++ b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java @@ -23,6 +23,8 @@ import org.apache.lucene.util.Constants; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.common.Randomness; +import org.elasticsearch.index.VersionType; +import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.geoip.IngestGeoIpPlugin.GeoIpCache; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.StreamsUtils; @@ -315,21 +317,36 @@ public void testLazyLoading() throws Exception { assertNull(lazyLoader.databaseReader.get()); } + final IngestDocument document = + new IngestDocument("index", "type", "id", "routing", 1L, VersionType.EXTERNAL, Collections.singletonMap("_field", "1.1.1.1")); + Map config = new HashMap<>(); config.put("field", "_field"); config.put("database_file", "GeoLite2-City.mmdb"); - factory.create(null, "_tag", config); + final GeoIpProcessor city = factory.create(null, "_tag", config); + + // these are lazy loaded until first use so we expect null here + assertNull(databaseReaders.get("GeoLite2-City.mmdb").databaseReader.get()); + city.execute(document); + assertNotNull(databaseReaders.get("GeoLite2-City.mmdb").databaseReader.get()); + config = new HashMap<>(); config.put("field", "_field"); config.put("database_file", "GeoLite2-Country.mmdb"); - factory.create(null, "_tag", config); + final GeoIpProcessor country = factory.create(null, "_tag", config); + + assertNull(databaseReaders.get("GeoLite2-Country.mmdb").databaseReader.get()); + country.execute(document); + assertNotNull(databaseReaders.get("GeoLite2-Country.mmdb").databaseReader.get()); + config = new HashMap<>(); config.put("field", "_field"); config.put("database_file", "GeoLite2-ASN.mmdb"); - factory.create(null, "_tag", config); + final GeoIpProcessor asn = factory.create(null, "_tag", config); - for (DatabaseReaderLazyLoader lazyLoader : databaseReaders.values()) { - assertNotNull(lazyLoader.databaseReader.get()); - } + assertNull(databaseReaders.get("GeoLite2-ASN.mmdb").databaseReader.get()); + asn.execute(document); + assertNotNull(databaseReaders.get("GeoLite2-ASN.mmdb").databaseReader.get()); } + } diff --git a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java index 5584673270d65..3b528f70334a6 100644 --- a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java +++ b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java @@ -20,11 +20,13 @@ package org.elasticsearch.ingest.geoip; import com.maxmind.geoip2.DatabaseReader; +import org.elasticsearch.common.io.PathUtils; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.RandomDocumentPicks; import org.elasticsearch.ingest.geoip.IngestGeoIpPlugin.GeoIpCache; import org.elasticsearch.test.ESTestCase; +import java.io.IOException; import java.io.InputStream; import java.util.Collections; import java.util.EnumSet; @@ -40,7 +42,7 @@ public class GeoIpProcessorTests extends ESTestCase { public void testCity() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", - loader("/GeoLite2-City.mmdb"), "target_field", () -> EnumSet.allOf(GeoIpProcessor.Property.class), false, + loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, new GeoIpCache(1000)); Map document = new HashMap<>(); @@ -64,7 +66,7 @@ public void testCity() throws Exception { public void testNullValueWithIgnoreMissing() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", - loader("/GeoLite2-City.mmdb"), "target_field", () -> EnumSet.allOf(GeoIpProcessor.Property.class), true, + loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), true, new GeoIpCache(1000)); IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.singletonMap("source_field", null)); @@ -75,7 +77,7 @@ public void testNullValueWithIgnoreMissing() throws Exception { public void testNonExistentWithIgnoreMissing() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", - loader("/GeoLite2-City.mmdb"), "target_field", () -> EnumSet.allOf(GeoIpProcessor.Property.class), true, + loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), true, new GeoIpCache(1000)); IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap()); IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); @@ -85,7 +87,7 @@ public void testNonExistentWithIgnoreMissing() throws Exception { public void testNullWithoutIgnoreMissing() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", - loader("/GeoLite2-City.mmdb"), "target_field", () -> EnumSet.allOf(GeoIpProcessor.Property.class), false, + loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, new GeoIpCache(1000)); IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.singletonMap("source_field", null)); @@ -96,7 +98,7 @@ public void testNullWithoutIgnoreMissing() throws Exception { public void testNonExistentWithoutIgnoreMissing() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", - loader("/GeoLite2-City.mmdb"), "target_field", () -> EnumSet.allOf(GeoIpProcessor.Property.class), false, + loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, new GeoIpCache(1000)); IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap()); IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); @@ -106,7 +108,7 @@ public void testNonExistentWithoutIgnoreMissing() throws Exception { public void testCity_withIpV6() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", - loader("/GeoLite2-City.mmdb"), "target_field", () -> EnumSet.allOf(GeoIpProcessor.Property.class), false, + loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, new GeoIpCache(1000)); String address = "2602:306:33d3:8000::3257:9652"; @@ -135,7 +137,7 @@ public void testCity_withIpV6() throws Exception { public void testCityWithMissingLocation() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", - loader("/GeoLite2-City.mmdb"), "target_field", () -> EnumSet.allOf(GeoIpProcessor.Property.class), false, + loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, new GeoIpCache(1000)); Map document = new HashMap<>(); @@ -152,7 +154,7 @@ public void testCityWithMissingLocation() throws Exception { public void testCountry() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", - loader("/GeoLite2-Country.mmdb"), "target_field", () -> EnumSet.allOf(GeoIpProcessor.Property.class), false, + loader("/GeoLite2-Country.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, new GeoIpCache(1000)); Map document = new HashMap<>(); @@ -172,7 +174,7 @@ public void testCountry() throws Exception { public void testCountryWithMissingLocation() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", - loader("/GeoLite2-Country.mmdb"), "target_field", () -> EnumSet.allOf(GeoIpProcessor.Property.class), false, + loader("/GeoLite2-Country.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, new GeoIpCache(1000)); Map document = new HashMap<>(); @@ -190,7 +192,7 @@ public void testCountryWithMissingLocation() throws Exception { public void testAsn() throws Exception { String ip = "82.171.64.0"; GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", - loader("/GeoLite2-ASN.mmdb"), "target_field", () -> EnumSet.allOf(GeoIpProcessor.Property.class), false, + loader("/GeoLite2-ASN.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, new GeoIpCache(1000)); Map document = new HashMap<>(); @@ -209,7 +211,7 @@ public void testAsn() throws Exception { public void testAddressIsNotInTheDatabase() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", - loader("/GeoLite2-City.mmdb"), "target_field", () -> EnumSet.allOf(GeoIpProcessor.Property.class), false, + loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, new GeoIpCache(1000)); Map document = new HashMap<>(); @@ -222,7 +224,7 @@ public void testAddressIsNotInTheDatabase() throws Exception { /** Don't silently do DNS lookups or anything trappy on bogus data */ public void testInvalid() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", - loader("/GeoLite2-City.mmdb"), "target_field", () -> EnumSet.allOf(GeoIpProcessor.Property.class), false, + loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, new GeoIpCache(1000)); Map document = new HashMap<>(); @@ -234,7 +236,12 @@ public void testInvalid() throws Exception { private DatabaseReaderLazyLoader loader(final String path) { final InputStream is = GeoIpProcessor.class.getResourceAsStream(path); - return new DatabaseReaderLazyLoader(path, () -> new DatabaseReader.Builder(is).build()); + return new DatabaseReaderLazyLoader(PathUtils.get(path), () -> new DatabaseReader.Builder(is).build()) { + @Override + InputStream databaseInputStream() throws IOException { + return GeoIpProcessor.class.getResourceAsStream(path); + } + }; } } From 9a37260b4a92094cc933f8ae9912d6dc6cb6818a Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 17 Dec 2018 14:44:58 -0500 Subject: [PATCH 07/25] Fix comments --- .../ingest/geoip/GeoIpProcessorFactoryTests.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java index e76243222cf2d..2005294c5a45e 100644 --- a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java +++ b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java @@ -328,6 +328,7 @@ public void testLazyLoading() throws Exception { // these are lazy loaded until first use so we expect null here assertNull(databaseReaders.get("GeoLite2-City.mmdb").databaseReader.get()); city.execute(document); + // the first ingest should trigger a database load assertNotNull(databaseReaders.get("GeoLite2-City.mmdb").databaseReader.get()); config = new HashMap<>(); @@ -335,8 +336,10 @@ public void testLazyLoading() throws Exception { config.put("database_file", "GeoLite2-Country.mmdb"); final GeoIpProcessor country = factory.create(null, "_tag", config); + // these are lazy loaded until first use so we expect null here assertNull(databaseReaders.get("GeoLite2-Country.mmdb").databaseReader.get()); country.execute(document); + // the first ingest should trigger a database load assertNotNull(databaseReaders.get("GeoLite2-Country.mmdb").databaseReader.get()); config = new HashMap<>(); @@ -344,8 +347,10 @@ public void testLazyLoading() throws Exception { config.put("database_file", "GeoLite2-ASN.mmdb"); final GeoIpProcessor asn = factory.create(null, "_tag", config); + // these are lazy loaded until first use so we expect null here assertNull(databaseReaders.get("GeoLite2-ASN.mmdb").databaseReader.get()); asn.execute(document); + // the first ingest should trigger a database load assertNotNull(databaseReaders.get("GeoLite2-ASN.mmdb").databaseReader.get()); } From 9315409d2ebc8fa329d354df4ae8d256553618c5 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 17 Dec 2018 17:11:10 -0500 Subject: [PATCH 08/25] Fix checkstyle --- .../elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java index 2005294c5a45e..4406b717f3685 100644 --- a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java +++ b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java @@ -318,7 +318,8 @@ public void testLazyLoading() throws Exception { } final IngestDocument document = - new IngestDocument("index", "type", "id", "routing", 1L, VersionType.EXTERNAL, Collections.singletonMap("_field", "1.1.1.1")); + new IngestDocument( + "index", "type", "id", "routing", 1L, VersionType.EXTERNAL, Collections.singletonMap("_field", "1.1.1.1")); Map config = new HashMap<>(); config.put("field", "_field"); From 39344df32a87fd43e6706cd3965d936791e5fab7 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 17 Dec 2018 17:21:11 -0500 Subject: [PATCH 09/25] Some cleanup --- .../ingest/geoip/DatabaseReaderLazyLoader.java | 8 ++------ .../elasticsearch/ingest/geoip/GeoIpProcessorTests.java | 7 +------ 2 files changed, 3 insertions(+), 12 deletions(-) diff --git a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java index f1ac29a4626f4..81a2c16e394a7 100644 --- a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java +++ b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java @@ -55,7 +55,7 @@ class DatabaseReaderLazyLoader implements Closeable { final String getDatabaseType() throws IOException { final long fileSize = Files.size(databasePath); final int[] DATABASE_TYPE_MARKER = {'d', 'a', 't', 'a', 'b', 'a', 's', 'e', '_', 't', 'y', 'p', 'e'}; - try (InputStream in = databaseInputStream()) { + try (InputStream in = Files.newInputStream(databasePath)) { // read last 512 bytes in.skip(fileSize - 512); byte[] tail = new byte[512]; @@ -89,11 +89,7 @@ final String getDatabaseType() throws IOException { } } - InputStream databaseInputStream() throws IOException { - return Files.newInputStream(databasePath); - } - - synchronized DatabaseReader get() throws IOException { + DatabaseReader get() throws IOException { if (databaseReader.get() == null) { synchronized (databaseReader) { if (databaseReader.get() == null) { diff --git a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java index 3b528f70334a6..dcb0fb871dc46 100644 --- a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java +++ b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java @@ -236,12 +236,7 @@ public void testInvalid() throws Exception { private DatabaseReaderLazyLoader loader(final String path) { final InputStream is = GeoIpProcessor.class.getResourceAsStream(path); - return new DatabaseReaderLazyLoader(PathUtils.get(path), () -> new DatabaseReader.Builder(is).build()) { - @Override - InputStream databaseInputStream() throws IOException { - return GeoIpProcessor.class.getResourceAsStream(path); - } - }; + return new DatabaseReaderLazyLoader(PathUtils.get(path), () -> new DatabaseReader.Builder(is).build()); } } From f6240d83bffe4cff9ad1e6e38199cc5df4575b37 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 17 Dec 2018 17:25:04 -0500 Subject: [PATCH 10/25] Error handling --- .../geoip/DatabaseReaderLazyLoader.java | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java index 81a2c16e394a7..70f8d04b2e83f 100644 --- a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java +++ b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java @@ -54,12 +54,25 @@ class DatabaseReaderLazyLoader implements Closeable { final String getDatabaseType() throws IOException { final long fileSize = Files.size(databasePath); + if (fileSize <= 512) { + throw new IllegalStateException("unexpected file length [" + fileSize + "] for [" + databasePath + "]"); + } final int[] DATABASE_TYPE_MARKER = {'d', 'a', 't', 'a', 'b', 'a', 's', 'e', '_', 't', 'y', 'p', 'e'}; try (InputStream in = Files.newInputStream(databasePath)) { // read last 512 bytes - in.skip(fileSize - 512); - byte[] tail = new byte[512]; - in.read(tail); + final long skipped = in.skip(fileSize - 512); + if (skipped != fileSize - 512) { + throw new IllegalStateException("failed to skip [" + (fileSize - 512) + "] bytes while reading [" + databasePath + "]"); + } + final byte[] tail = new byte[512]; + int read = 0; + do { + final int actualBytesRead = in.read(tail, read, 512 - read); + if (actualBytesRead == -1) { + throw new IllegalStateException("unexpected end of stream [" + databasePath + "] after reading [" + read + "] bytes"); + } + read += actualBytesRead; + } while (read != 512); // find the database_type header int metadataOffset = -1; From a89d2abcb7788dab07a18f0ed0ccc7dc914681b1 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 17 Dec 2018 17:27:41 -0500 Subject: [PATCH 11/25] Javadocs --- .../ingest/geoip/DatabaseReaderLazyLoader.java | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java index 70f8d04b2e83f..ba2f9eab9b22a 100644 --- a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java +++ b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java @@ -52,24 +52,33 @@ class DatabaseReaderLazyLoader implements Closeable { this.databaseReader = new SetOnce<>(); } + /** + * Read the database type from the database. We do this manually instead of relying on the built-in mechanism to avoid reading the + * entire database into memory merely to read the type. This is especially important to maintain on master nodes where pipelines are + * validated. If we read the entire database into memory, we could potentially run into low-memory constraints on such nodes where + * loading this data would otherwise be wasteful if they are not also ingest nodes. + * + * @return the database type + * @throws IOException if an I/O exception occurs reading the database type + */ final String getDatabaseType() throws IOException { final long fileSize = Files.size(databasePath); if (fileSize <= 512) { - throw new IllegalStateException("unexpected file length [" + fileSize + "] for [" + databasePath + "]"); + throw new IOException("unexpected file length [" + fileSize + "] for [" + databasePath + "]"); } final int[] DATABASE_TYPE_MARKER = {'d', 'a', 't', 'a', 'b', 'a', 's', 'e', '_', 't', 'y', 'p', 'e'}; try (InputStream in = Files.newInputStream(databasePath)) { // read last 512 bytes final long skipped = in.skip(fileSize - 512); if (skipped != fileSize - 512) { - throw new IllegalStateException("failed to skip [" + (fileSize - 512) + "] bytes while reading [" + databasePath + "]"); + throw new IOException("failed to skip [" + (fileSize - 512) + "] bytes while reading [" + databasePath + "]"); } final byte[] tail = new byte[512]; int read = 0; do { final int actualBytesRead = in.read(tail, read, 512 - read); if (actualBytesRead == -1) { - throw new IllegalStateException("unexpected end of stream [" + databasePath + "] after reading [" + read + "] bytes"); + throw new IOException("unexpected end of stream [" + databasePath + "] after reading [" + read + "] bytes"); } read += actualBytesRead; } while (read != 512); From 719fff16cae23da305e7e716ae35f618dad459cf Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 17 Dec 2018 17:28:16 -0500 Subject: [PATCH 12/25] Better exception --- .../elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java index ba2f9eab9b22a..509bf4016b465 100644 --- a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java +++ b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java @@ -104,7 +104,7 @@ final String getDatabaseType() throws IOException { final int offsetByte = tail[metadataOffset] & 0xFF; final int type = offsetByte >>> 5; if (type != 2) { - throw new RuntimeException("type must be UTF8_STRING"); + throw new IOException("type must be UTF-8 string"); } int size = offsetByte & 0x1f; return new String(tail, metadataOffset + 1, size, StandardCharsets.UTF_8); From 23a47c6e056240ff7d9a0bb6c6c0f4bb67eb0168 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 17 Dec 2018 17:29:10 -0500 Subject: [PATCH 13/25] More cleanup --- .../ingest/geoip/DatabaseReaderLazyLoader.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java index 509bf4016b465..702221f545913 100644 --- a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java +++ b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java @@ -66,9 +66,9 @@ final String getDatabaseType() throws IOException { if (fileSize <= 512) { throw new IOException("unexpected file length [" + fileSize + "] for [" + databasePath + "]"); } - final int[] DATABASE_TYPE_MARKER = {'d', 'a', 't', 'a', 'b', 'a', 's', 'e', '_', 't', 'y', 'p', 'e'}; + final int[] databaseTypeMarker = {'d', 'a', 't', 'a', 'b', 'a', 's', 'e', '_', 't', 'y', 'p', 'e'}; try (InputStream in = Files.newInputStream(databasePath)) { - // read last 512 bytes + // read the last 512 bytes final long skipped = in.skip(fileSize - 512); if (skipped != fileSize - 512) { throw new IOException("failed to skip [" + (fileSize - 512) + "] bytes while reading [" + databasePath + "]"); @@ -89,12 +89,12 @@ final String getDatabaseType() throws IOException { for (int i = 0; i < tail.length; i++) { byte b = tail[i]; - if (b == DATABASE_TYPE_MARKER[markerOffset]) { + if (b == databaseTypeMarker[markerOffset]) { markerOffset++; } else { markerOffset = 0; } - if (markerOffset == DATABASE_TYPE_MARKER.length) { + if (markerOffset == databaseTypeMarker.length) { metadataOffset = i + 1; break; } From 8014754000b06950ba52fd2ae57dd745546dc50f Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 17 Dec 2018 17:36:32 -0500 Subject: [PATCH 14/25] Another leanup --- .../elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java index 702221f545913..a6565b28106f7 100644 --- a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java +++ b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java @@ -116,7 +116,7 @@ DatabaseReader get() throws IOException { synchronized (databaseReader) { if (databaseReader.get() == null) { databaseReader.set(loader.get()); - LOGGER.debug("Loaded [{}] geo-IP database", databasePath); + LOGGER.debug("loaded [{}] geo-IP database", databasePath); } } } From 0d300c65b16d2a3f71754cc8ac4cf5b3c0f501f5 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 17 Dec 2018 17:54:58 -0500 Subject: [PATCH 15/25] Remove unneeded throws --- .../org/elasticsearch/ingest/geoip/GeoIpProcessor.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java index 5fc08c70073fa..95aade9d6a2ac 100644 --- a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java +++ b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java @@ -162,11 +162,11 @@ DatabaseReader getDbReader() throws IOException { return dbReader.get(); } - Set getProperties() throws IOException { + Set getProperties() { return properties; } - private Map retrieveCityGeoData(InetAddress ipAddress) throws IOException { + private Map retrieveCityGeoData(InetAddress ipAddress) { SpecialPermission.check(); CityResponse response = AccessController.doPrivileged((PrivilegedAction) () -> cache.putIfAbsent(ipAddress, CityResponse.class, ip -> { @@ -252,7 +252,7 @@ private Map retrieveCityGeoData(InetAddress ipAddress) throws IO return geoData; } - private Map retrieveCountryGeoData(InetAddress ipAddress) throws IOException { + private Map retrieveCountryGeoData(InetAddress ipAddress) { SpecialPermission.check(); CountryResponse response = AccessController.doPrivileged((PrivilegedAction) () -> cache.putIfAbsent(ipAddress, CountryResponse.class, ip -> { @@ -297,7 +297,7 @@ private Map retrieveCountryGeoData(InetAddress ipAddress) throws return geoData; } - private Map retrieveAsnGeoData(InetAddress ipAddress) throws IOException { + private Map retrieveAsnGeoData(InetAddress ipAddress) { SpecialPermission.check(); AsnResponse response = AccessController.doPrivileged((PrivilegedAction) () -> cache.putIfAbsent(ipAddress, AsnResponse.class, ip -> { From 2eca461eab3bda27602967dfdad960fa9559d81d Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 17 Dec 2018 18:07:21 -0500 Subject: [PATCH 16/25] Fix imports --- .../java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java | 1 - 1 file changed, 1 deletion(-) diff --git a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java index dcb0fb871dc46..b4cc0e7430e04 100644 --- a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java +++ b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java @@ -26,7 +26,6 @@ import org.elasticsearch.ingest.geoip.IngestGeoIpPlugin.GeoIpCache; import org.elasticsearch.test.ESTestCase; -import java.io.IOException; import java.io.InputStream; import java.util.Collections; import java.util.EnumSet; From f3289aba38dd8519a6bb69cd97249ef2dd2037d5 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 18 Dec 2018 08:42:28 -0500 Subject: [PATCH 17/25] Add test showing no loading on non-ingest --- .../ingest/geoip/GeoIpProcessor.java | 6 + .../geoip/GeoIpProcessorNonIngestNodeIT.java | 162 ++++++++++++++++++ 2 files changed, 168 insertions(+) create mode 100644 plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorNonIngestNodeIT.java diff --git a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java index 95aade9d6a2ac..f1a5256d774d6 100644 --- a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java +++ b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java @@ -43,6 +43,7 @@ import java.security.AccessController; import java.security.PrivilegedAction; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; @@ -347,6 +348,11 @@ public static final class Factory implements Processor.Factory { ); private final Map databaseReaders; + + Map databaseReaders() { + return Collections.unmodifiableMap(databaseReaders); + } + private final GeoIpCache cache; public Factory(Map databaseReaders, GeoIpCache cache) { diff --git a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorNonIngestNodeIT.java b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorNonIngestNodeIT.java new file mode 100644 index 0000000000000..155c68fdda3fa --- /dev/null +++ b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorNonIngestNodeIT.java @@ -0,0 +1,162 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest.geoip; + +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.ingest.PutPipelineRequest; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.ingest.IngestService; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.StreamsUtils; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; + +public class GeoIpProcessorNonIngestNodeIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return Collections.singleton(IngestGeoIpPlugin.class); + } + + @Override + protected Settings nodeSettings(final int nodeOrdinal) { + return Settings.builder().put("node.ingest", false).put(super.nodeSettings(nodeOrdinal)).build(); + } + + @Override + protected Path nodeConfigPath(final int nodeOrdinal) { + final Path configPath = createTempDir(); + try { + final Path databasePath = configPath.resolve("ingest-geoip"); + Files.createDirectories(databasePath); + Files.copy( + new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-City.mmdb")), + databasePath.resolve("GeoLite2-City.mmdb")); + Files.copy( + new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-Country.mmdb")), + databasePath.resolve("GeoLite2-Country.mmdb")); + Files.copy( + new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-ASN.mmdb")), + databasePath.resolve("GeoLite2-ASN.mmdb")); + } catch (final IOException e) { + throw new UncheckedIOException(e); + } + return configPath; + } + + public void testLazyLoading() throws IOException { + final BytesReference bytes; + try (XContentBuilder builder = JsonXContent.contentBuilder()) { + builder.startObject(); + { + builder.field("description", "test"); + builder.startArray("processors"); + { + builder.startObject(); + { + builder.startObject("geoip"); + { + builder.field("field", "ip"); + builder.field("target_field", "ip-city"); + builder.field("database_file", "GeoLite2-City.mmdb"); + } + builder.endObject(); + } + builder.endObject(); + builder.startObject(); + { + builder.startObject("geoip"); + { + builder.field("field", "ip"); + builder.field("target_field", "ip-country"); + builder.field("database_file", "GeoLite2-Country.mmdb"); + } + builder.endObject(); + } + builder.endObject(); + builder.startObject(); + { + builder.startObject("geoip"); + { + builder.field("field", "ip"); + builder.field("target_field", "ip-asn"); + builder.field("database_file", "GeoLite2-ASN.mmdb"); + } + builder.endObject(); + } + builder.endObject(); + } + builder.endArray(); + } + builder.endObject(); + bytes = BytesReference.bytes(builder); + } + assertAcked(client().admin().cluster().putPipeline(new PutPipelineRequest("geoip", bytes, XContentType.JSON)).actionGet()); + // the geo-IP databases should not be loaded on any nodes as they are all non-ingest nodes + Arrays.stream(internalCluster().getNodeNames()).forEach(node -> assertDatabaseLoadStatus(node, false)); + + // start an ingest node + final String ingestNode = internalCluster().startNode(Settings.builder().put("node.ingest", true).build()); + internalCluster().getInstance(IngestService.class, ingestNode); + // the geo-IP database should not be loaded yet as we have no indexed any documents using a pipeline that has a geo-IP processor + assertDatabaseLoadStatus(ingestNode, false); + final IndexRequest indexRequest = new IndexRequest("index", "_doc"); + indexRequest.setPipeline("geoip"); + indexRequest.source(Collections.singletonMap("ip", "1.1.1.1")); + final IndexResponse indexResponse = client().index(indexRequest).actionGet(); + assertThat(indexResponse.status(), equalTo(RestStatus.CREATED)); + // now the geo-IP database should be loaded on the ingest node + assertDatabaseLoadStatus(ingestNode, true); + // the geo-IP database should still not be loaded on the non-ingest nodes + Arrays.stream(internalCluster().getNodeNames()) + .filter(node -> node.equals(ingestNode) == false) + .forEach(node -> assertDatabaseLoadStatus(node, false)); + } + + private void assertDatabaseLoadStatus(final String node, final boolean loaded) { + final IngestService ingestService = internalCluster().getInstance(IngestService.class, node); + final GeoIpProcessor.Factory factory = (GeoIpProcessor.Factory)ingestService.getProcessorFactories().get("geoip"); + for (final DatabaseReaderLazyLoader loader : factory.databaseReaders().values()) { + if (loaded) { + assertNotNull(loader.databaseReader.get()); + } else { + assertNull(loader.databaseReader.get()); + } + } + } + +} \ No newline at end of file From b823514264f2ee1a67dd22c26583121cacb2f471 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 18 Dec 2018 08:43:33 -0500 Subject: [PATCH 18/25] Add Javadocs --- .../ingest/geoip/GeoIpProcessorNonIngestNodeIT.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorNonIngestNodeIT.java b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorNonIngestNodeIT.java index 155c68fdda3fa..0825de648c802 100644 --- a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorNonIngestNodeIT.java +++ b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorNonIngestNodeIT.java @@ -78,6 +78,11 @@ protected Path nodeConfigPath(final int nodeOrdinal) { return configPath; } + /** + * This test shows that we do not load the geo-IP databases on non-ingest nodes, and only load on ingest nodes on first use. + * + * @throws IOException if an I/O exception occurs building the JSON + */ public void testLazyLoading() throws IOException { final BytesReference bytes; try (XContentBuilder builder = JsonXContent.contentBuilder()) { From 4bd2926c6da25acf6197c62b4af0f7fdfc8e6ad5 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 18 Dec 2018 08:43:51 -0500 Subject: [PATCH 19/25] Make class final --- .../elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java index a6565b28106f7..ef6a21f80475a 100644 --- a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java +++ b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java @@ -38,7 +38,7 @@ * Facilitates lazy loading of the database reader, so that when the geoip plugin is installed, but not used, * no memory is being wasted on the database reader. */ -class DatabaseReaderLazyLoader implements Closeable { +final class DatabaseReaderLazyLoader implements Closeable { private static final Logger LOGGER = LogManager.getLogger(DatabaseReaderLazyLoader.class); From bea6de972fb75cf02a6177b8c495bb1c6a177869 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 18 Dec 2018 08:44:59 -0500 Subject: [PATCH 20/25] A little more cleanup --- .../ingest/geoip/GeoIpProcessor.java | 25 +++++++++---------- .../geoip/GeoIpProcessorFactoryTests.java | 10 ++++---- 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java index f1a5256d774d6..c33ba803af6cc 100644 --- a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java +++ b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java @@ -43,7 +43,6 @@ import java.security.AccessController; import java.security.PrivilegedAction; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; @@ -66,7 +65,7 @@ public final class GeoIpProcessor extends AbstractProcessor { private final String field; private final String targetField; - private final DatabaseReaderLazyLoader dbReader; + private final DatabaseReaderLazyLoader lazyLoader; private final Set properties; private final boolean ignoreMissing; private final GeoIpCache cache; @@ -76,7 +75,7 @@ public final class GeoIpProcessor extends AbstractProcessor { * * @param tag the processor tag * @param field the source field to geo-IP map - * @param dbReader a supplier of a geo-IP database reader; ideally this is lazily-loaded once on first use + * @param lazyLoader a supplier of a geo-IP database reader; ideally this is lazily-loaded once on first use * @param targetField the target field * @param properties the properties; ideally this is lazily-loaded once on first use * @param ignoreMissing true if documents with a missing value for the field should be ignored @@ -85,7 +84,7 @@ public final class GeoIpProcessor extends AbstractProcessor { GeoIpProcessor( final String tag, final String field, - final DatabaseReaderLazyLoader dbReader, + final DatabaseReaderLazyLoader lazyLoader, final String targetField, final Set properties, final boolean ignoreMissing, @@ -93,8 +92,8 @@ public final class GeoIpProcessor extends AbstractProcessor { super(tag); this.field = field; this.targetField = targetField; - this.dbReader = dbReader; - this.properties = properties; + this.lazyLoader = lazyLoader; + this.properties = Collections.unmodifiableSet(properties); this.ignoreMissing = ignoreMissing; this.cache = cache; } @@ -116,7 +115,7 @@ public IngestDocument execute(IngestDocument ingestDocument) throws IOException final InetAddress ipAddress = InetAddresses.forString(ip); Map geoData; - String databaseType = dbReader.get().getMetadata().getDatabaseType(); + String databaseType = lazyLoader.get().getMetadata().getDatabaseType(); if (databaseType.endsWith(CITY_DB_SUFFIX)) { try { @@ -137,7 +136,7 @@ public IngestDocument execute(IngestDocument ingestDocument) throws IOException geoData = Collections.emptyMap(); } } else { - throw new ElasticsearchParseException("Unsupported database type [" + dbReader.get().getMetadata().getDatabaseType() + throw new ElasticsearchParseException("Unsupported database type [" + lazyLoader.get().getMetadata().getDatabaseType() + "]", new IllegalStateException()); } if (geoData.isEmpty() == false) { @@ -159,8 +158,8 @@ String getTargetField() { return targetField; } - DatabaseReader getDbReader() throws IOException { - return dbReader.get(); + DatabaseReader getDatabaseReader() throws IOException { + return lazyLoader.get(); } Set getProperties() { @@ -172,7 +171,7 @@ private Map retrieveCityGeoData(InetAddress ipAddress) { CityResponse response = AccessController.doPrivileged((PrivilegedAction) () -> cache.putIfAbsent(ipAddress, CityResponse.class, ip -> { try { - return dbReader.get().city(ip); + return lazyLoader.get().city(ip); } catch (AddressNotFoundException e) { throw new AddressNotFoundRuntimeException(e); } catch (Exception e) { @@ -258,7 +257,7 @@ private Map retrieveCountryGeoData(InetAddress ipAddress) { CountryResponse response = AccessController.doPrivileged((PrivilegedAction) () -> cache.putIfAbsent(ipAddress, CountryResponse.class, ip -> { try { - return dbReader.get().country(ip); + return lazyLoader.get().country(ip); } catch (AddressNotFoundException e) { throw new AddressNotFoundRuntimeException(e); } catch (Exception e) { @@ -303,7 +302,7 @@ private Map retrieveAsnGeoData(InetAddress ipAddress) { AsnResponse response = AccessController.doPrivileged((PrivilegedAction) () -> cache.putIfAbsent(ipAddress, AsnResponse.class, ip -> { try { - return dbReader.get().asn(ip); + return lazyLoader.get().asn(ip); } catch (AddressNotFoundException e) { throw new AddressNotFoundRuntimeException(e); } catch (Exception e) { diff --git a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java index 4406b717f3685..fc1874728187f 100644 --- a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java +++ b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java @@ -102,7 +102,7 @@ public void testBuildDefaults() throws Exception { assertThat(processor.getTag(), equalTo(processorTag)); assertThat(processor.getField(), equalTo("_field")); assertThat(processor.getTargetField(), equalTo("geoip")); - assertThat(processor.getDbReader().getMetadata().getDatabaseType(), equalTo("GeoLite2-City")); + assertThat(processor.getDatabaseReader().getMetadata().getDatabaseType(), equalTo("GeoLite2-City")); assertThat(processor.getProperties(), sameInstance(GeoIpProcessor.Factory.DEFAULT_CITY_PROPERTIES)); assertFalse(processor.isIgnoreMissing()); } @@ -122,7 +122,7 @@ public void testSetIgnoreMissing() throws Exception { assertThat(processor.getTag(), equalTo(processorTag)); assertThat(processor.getField(), equalTo("_field")); assertThat(processor.getTargetField(), equalTo("geoip")); - assertThat(processor.getDbReader().getMetadata().getDatabaseType(), equalTo("GeoLite2-City")); + assertThat(processor.getDatabaseReader().getMetadata().getDatabaseType(), equalTo("GeoLite2-City")); assertThat(processor.getProperties(), sameInstance(GeoIpProcessor.Factory.DEFAULT_CITY_PROPERTIES)); assertTrue(processor.isIgnoreMissing()); } @@ -143,7 +143,7 @@ public void testCountryBuildDefaults() throws Exception { assertThat(processor.getTag(), equalTo(processorTag)); assertThat(processor.getField(), equalTo("_field")); assertThat(processor.getTargetField(), equalTo("geoip")); - assertThat(processor.getDbReader().getMetadata().getDatabaseType(), equalTo("GeoLite2-Country")); + assertThat(processor.getDatabaseReader().getMetadata().getDatabaseType(), equalTo("GeoLite2-Country")); assertThat(processor.getProperties(), sameInstance(GeoIpProcessor.Factory.DEFAULT_COUNTRY_PROPERTIES)); assertFalse(processor.isIgnoreMissing()); } @@ -164,7 +164,7 @@ public void testAsnBuildDefaults() throws Exception { assertThat(processor.getTag(), equalTo(processorTag)); assertThat(processor.getField(), equalTo("_field")); assertThat(processor.getTargetField(), equalTo("geoip")); - assertThat(processor.getDbReader().getMetadata().getDatabaseType(), equalTo("GeoLite2-ASN")); + assertThat(processor.getDatabaseReader().getMetadata().getDatabaseType(), equalTo("GeoLite2-ASN")); assertThat(processor.getProperties(), sameInstance(GeoIpProcessor.Factory.DEFAULT_ASN_PROPERTIES)); assertFalse(processor.isIgnoreMissing()); } @@ -194,7 +194,7 @@ public void testBuildDbFile() throws Exception { GeoIpProcessor processor = factory.create(null, null, config); assertThat(processor.getField(), equalTo("_field")); assertThat(processor.getTargetField(), equalTo("geoip")); - assertThat(processor.getDbReader().getMetadata().getDatabaseType(), equalTo("GeoLite2-Country")); + assertThat(processor.getDatabaseReader().getMetadata().getDatabaseType(), equalTo("GeoLite2-Country")); assertThat(processor.getProperties(), sameInstance(GeoIpProcessor.Factory.DEFAULT_COUNTRY_PROPERTIES)); assertFalse(processor.isIgnoreMissing()); } From e37e8388dcd00fb41c4afd562c8572839be50f1b Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 18 Dec 2018 10:24:13 -0500 Subject: [PATCH 21/25] Use lazy loader database type --- .../geoip/DatabaseReaderLazyLoader.java | 109 ++++++++++-------- .../ingest/geoip/GeoIpProcessor.java | 27 ++--- .../geoip/GeoIpProcessorFactoryTests.java | 10 +- .../ingest/geoip/GeoIpProcessorTests.java | 30 ++++- 4 files changed, 111 insertions(+), 65 deletions(-) diff --git a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java index ef6a21f80475a..d8cf65c06ab2f 100644 --- a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java +++ b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java @@ -38,7 +38,7 @@ * Facilitates lazy loading of the database reader, so that when the geoip plugin is installed, but not used, * no memory is being wasted on the database reader. */ -final class DatabaseReaderLazyLoader implements Closeable { +class DatabaseReaderLazyLoader implements Closeable { private static final Logger LOGGER = LogManager.getLogger(DatabaseReaderLazyLoader.class); @@ -46,10 +46,14 @@ final class DatabaseReaderLazyLoader implements Closeable { private final CheckedSupplier loader; final SetOnce databaseReader; + // cache the database type so that we do not re-read it on every pipeline execution + final SetOnce databaseType; + DatabaseReaderLazyLoader(final Path databasePath, final CheckedSupplier loader) { this.databasePath = Objects.requireNonNull(databasePath); this.loader = Objects.requireNonNull(loader); this.databaseReader = new SetOnce<>(); + this.databaseType = new SetOnce<>(); } /** @@ -62,53 +66,68 @@ final class DatabaseReaderLazyLoader implements Closeable { * @throws IOException if an I/O exception occurs reading the database type */ final String getDatabaseType() throws IOException { - final long fileSize = Files.size(databasePath); - if (fileSize <= 512) { - throw new IOException("unexpected file length [" + fileSize + "] for [" + databasePath + "]"); - } - final int[] databaseTypeMarker = {'d', 'a', 't', 'a', 'b', 'a', 's', 'e', '_', 't', 'y', 'p', 'e'}; - try (InputStream in = Files.newInputStream(databasePath)) { - // read the last 512 bytes - final long skipped = in.skip(fileSize - 512); - if (skipped != fileSize - 512) { - throw new IOException("failed to skip [" + (fileSize - 512) + "] bytes while reading [" + databasePath + "]"); - } - final byte[] tail = new byte[512]; - int read = 0; - do { - final int actualBytesRead = in.read(tail, read, 512 - read); - if (actualBytesRead == -1) { - throw new IOException("unexpected end of stream [" + databasePath + "] after reading [" + read + "] bytes"); - } - read += actualBytesRead; - } while (read != 512); - - // find the database_type header - int metadataOffset = -1; - int markerOffset = 0; - for (int i = 0; i < tail.length; i++) { - byte b = tail[i]; - - if (b == databaseTypeMarker[markerOffset]) { - markerOffset++; - } else { - markerOffset = 0; - } - if (markerOffset == databaseTypeMarker.length) { - metadataOffset = i + 1; - break; - } - } + if (databaseType.get() == null) { + synchronized (databaseType) { + if (databaseType.get() == null) { + final long fileSize = databaseFileSize(); + if (fileSize <= 512) { + throw new IOException("unexpected file length [" + fileSize + "] for [" + databasePath + "]"); + } + final int[] databaseTypeMarker = {'d', 'a', 't', 'a', 'b', 'a', 's', 'e', '_', 't', 'y', 'p', 'e'}; + try (InputStream in = databaseInputStream()) { + // read the last 512 bytes + final long skipped = in.skip(fileSize - 512); + if (skipped != fileSize - 512) { + throw new IOException("failed to skip [" + (fileSize - 512) + "] bytes while reading [" + databasePath + "]"); + } + final byte[] tail = new byte[512]; + int read = 0; + do { + final int actualBytesRead = in.read(tail, read, 512 - read); + if (actualBytesRead == -1) { + throw new IOException("unexpected end of stream [" + databasePath + "] after reading [" + read + "] bytes"); + } + read += actualBytesRead; + } while (read != 512); - // read the database type - final int offsetByte = tail[metadataOffset] & 0xFF; - final int type = offsetByte >>> 5; - if (type != 2) { - throw new IOException("type must be UTF-8 string"); + // find the database_type header + int metadataOffset = -1; + int markerOffset = 0; + for (int i = 0; i < tail.length; i++) { + byte b = tail[i]; + + if (b == databaseTypeMarker[markerOffset]) { + markerOffset++; + } else { + markerOffset = 0; + } + if (markerOffset == databaseTypeMarker.length) { + metadataOffset = i + 1; + break; + } + } + + // read the database type + final int offsetByte = tail[metadataOffset] & 0xFF; + final int type = offsetByte >>> 5; + if (type != 2) { + throw new IOException("type must be UTF-8 string"); + } + int size = offsetByte & 0x1f; + databaseType.set(new String(tail, metadataOffset + 1, size, StandardCharsets.UTF_8)); + } + } } - int size = offsetByte & 0x1f; - return new String(tail, metadataOffset + 1, size, StandardCharsets.UTF_8); } + return databaseType.get(); + } + + long databaseFileSize() throws IOException { + return Files.size(databasePath); + } + + InputStream databaseInputStream() throws IOException { + return Files.newInputStream(databasePath); } DatabaseReader get() throws IOException { diff --git a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java index c33ba803af6cc..5b3fa1242ee08 100644 --- a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java +++ b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java @@ -93,7 +93,7 @@ public final class GeoIpProcessor extends AbstractProcessor { this.field = field; this.targetField = targetField; this.lazyLoader = lazyLoader; - this.properties = Collections.unmodifiableSet(properties); + this.properties = properties; this.ignoreMissing = ignoreMissing; this.cache = cache; } @@ -115,7 +115,7 @@ public IngestDocument execute(IngestDocument ingestDocument) throws IOException final InetAddress ipAddress = InetAddresses.forString(ip); Map geoData; - String databaseType = lazyLoader.get().getMetadata().getDatabaseType(); + String databaseType = lazyLoader.getDatabaseType(); if (databaseType.endsWith(CITY_DB_SUFFIX)) { try { @@ -136,7 +136,7 @@ public IngestDocument execute(IngestDocument ingestDocument) throws IOException geoData = Collections.emptyMap(); } } else { - throw new ElasticsearchParseException("Unsupported database type [" + lazyLoader.get().getMetadata().getDatabaseType() + throw new ElasticsearchParseException("Unsupported database type [" + lazyLoader.getDatabaseType() + "]", new IllegalStateException()); } if (geoData.isEmpty() == false) { @@ -158,8 +158,8 @@ String getTargetField() { return targetField; } - DatabaseReader getDatabaseReader() throws IOException { - return lazyLoader.get(); + String getDatabaseType() throws IOException { + return lazyLoader.getDatabaseType(); } Set getProperties() { @@ -335,16 +335,16 @@ private Map retrieveAsnGeoData(InetAddress ipAddress) { } public static final class Factory implements Processor.Factory { - static final Set DEFAULT_CITY_PROPERTIES = EnumSet.of( + static final Set DEFAULT_CITY_PROPERTIES = Collections.unmodifiableSet(EnumSet.of( Property.CONTINENT_NAME, Property.COUNTRY_ISO_CODE, Property.REGION_ISO_CODE, Property.REGION_NAME, Property.CITY_NAME, Property.LOCATION - ); - static final Set DEFAULT_COUNTRY_PROPERTIES = EnumSet.of( + )); + static final Set DEFAULT_COUNTRY_PROPERTIES = Collections.unmodifiableSet(EnumSet.of( Property.CONTINENT_NAME, Property.COUNTRY_ISO_CODE - ); - static final Set DEFAULT_ASN_PROPERTIES = EnumSet.of( + )); + static final Set DEFAULT_ASN_PROPERTIES = Collections.unmodifiableSet(EnumSet.of( Property.IP, Property.ASN, Property.ORGANIZATION_NAME - ); + )); private final Map databaseReaders; @@ -380,14 +380,15 @@ public GeoIpProcessor create( final Set properties; if (propertyNames != null) { - properties = EnumSet.noneOf(Property.class); + Set modifiableProperties = EnumSet.noneOf(Property.class); for (String fieldName : propertyNames) { try { - properties.add(Property.parseProperty(databaseType, fieldName)); + modifiableProperties.add(Property.parseProperty(databaseType, fieldName)); } catch (IllegalArgumentException e) { throw newConfigurationException(TYPE, processorTag, "properties", e.getMessage()); } } + properties = Collections.unmodifiableSet(modifiableProperties); } else { if (databaseType.endsWith(CITY_DB_SUFFIX)) { properties = DEFAULT_CITY_PROPERTIES; diff --git a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java index fc1874728187f..3db6b78bba518 100644 --- a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java +++ b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java @@ -102,7 +102,7 @@ public void testBuildDefaults() throws Exception { assertThat(processor.getTag(), equalTo(processorTag)); assertThat(processor.getField(), equalTo("_field")); assertThat(processor.getTargetField(), equalTo("geoip")); - assertThat(processor.getDatabaseReader().getMetadata().getDatabaseType(), equalTo("GeoLite2-City")); + assertThat(processor.getDatabaseType(), equalTo("GeoLite2-City")); assertThat(processor.getProperties(), sameInstance(GeoIpProcessor.Factory.DEFAULT_CITY_PROPERTIES)); assertFalse(processor.isIgnoreMissing()); } @@ -122,7 +122,7 @@ public void testSetIgnoreMissing() throws Exception { assertThat(processor.getTag(), equalTo(processorTag)); assertThat(processor.getField(), equalTo("_field")); assertThat(processor.getTargetField(), equalTo("geoip")); - assertThat(processor.getDatabaseReader().getMetadata().getDatabaseType(), equalTo("GeoLite2-City")); + assertThat(processor.getDatabaseType(), equalTo("GeoLite2-City")); assertThat(processor.getProperties(), sameInstance(GeoIpProcessor.Factory.DEFAULT_CITY_PROPERTIES)); assertTrue(processor.isIgnoreMissing()); } @@ -143,7 +143,7 @@ public void testCountryBuildDefaults() throws Exception { assertThat(processor.getTag(), equalTo(processorTag)); assertThat(processor.getField(), equalTo("_field")); assertThat(processor.getTargetField(), equalTo("geoip")); - assertThat(processor.getDatabaseReader().getMetadata().getDatabaseType(), equalTo("GeoLite2-Country")); + assertThat(processor.getDatabaseType(), equalTo("GeoLite2-Country")); assertThat(processor.getProperties(), sameInstance(GeoIpProcessor.Factory.DEFAULT_COUNTRY_PROPERTIES)); assertFalse(processor.isIgnoreMissing()); } @@ -164,7 +164,7 @@ public void testAsnBuildDefaults() throws Exception { assertThat(processor.getTag(), equalTo(processorTag)); assertThat(processor.getField(), equalTo("_field")); assertThat(processor.getTargetField(), equalTo("geoip")); - assertThat(processor.getDatabaseReader().getMetadata().getDatabaseType(), equalTo("GeoLite2-ASN")); + assertThat(processor.getDatabaseType(), equalTo("GeoLite2-ASN")); assertThat(processor.getProperties(), sameInstance(GeoIpProcessor.Factory.DEFAULT_ASN_PROPERTIES)); assertFalse(processor.isIgnoreMissing()); } @@ -194,7 +194,7 @@ public void testBuildDbFile() throws Exception { GeoIpProcessor processor = factory.create(null, null, config); assertThat(processor.getField(), equalTo("_field")); assertThat(processor.getTargetField(), equalTo("geoip")); - assertThat(processor.getDatabaseReader().getMetadata().getDatabaseType(), equalTo("GeoLite2-Country")); + assertThat(processor.getDatabaseType(), equalTo("GeoLite2-Country")); assertThat(processor.getProperties(), sameInstance(GeoIpProcessor.Factory.DEFAULT_COUNTRY_PROPERTIES)); assertFalse(processor.isIgnoreMissing()); } diff --git a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java index b4cc0e7430e04..e3fe54d2f1127 100644 --- a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java +++ b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java @@ -20,17 +20,20 @@ package org.elasticsearch.ingest.geoip; import com.maxmind.geoip2.DatabaseReader; +import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.common.io.PathUtils; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.RandomDocumentPicks; import org.elasticsearch.ingest.geoip.IngestGeoIpPlugin.GeoIpCache; import org.elasticsearch.test.ESTestCase; +import java.io.IOException; import java.io.InputStream; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.Map; +import java.util.function.Supplier; import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument; import static org.hamcrest.Matchers.containsString; @@ -234,8 +237,31 @@ public void testInvalid() throws Exception { } private DatabaseReaderLazyLoader loader(final String path) { - final InputStream is = GeoIpProcessor.class.getResourceAsStream(path); - return new DatabaseReaderLazyLoader(PathUtils.get(path), () -> new DatabaseReader.Builder(is).build()); + final Supplier databaseInputStreamSupplier = () -> GeoIpProcessor.class.getResourceAsStream(path); + final CheckedSupplier loader = + () -> new DatabaseReader.Builder(databaseInputStreamSupplier.get()).build(); + return new DatabaseReaderLazyLoader(PathUtils.get(path), loader) { + + @Override + long databaseFileSize() throws IOException { + try (InputStream is = databaseInputStreamSupplier.get()) { + long bytesRead = 0; + do { + final byte[] bytes = new byte[1 << 10]; + final int read = is.read(bytes); + if (read == -1) break; + bytesRead += read; + } while (true); + return bytesRead; + } + } + + @Override + InputStream databaseInputStream() throws IOException { + return databaseInputStreamSupplier.get(); + } + + }; } } From e6dc88a13065e1ab066b22dd7b4c56fe261f6168 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 18 Dec 2018 10:25:32 -0500 Subject: [PATCH 22/25] Fix imports --- .../main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java | 1 - 1 file changed, 1 deletion(-) diff --git a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java index 5b3fa1242ee08..5c82c68d93032 100644 --- a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java +++ b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java @@ -19,7 +19,6 @@ package org.elasticsearch.ingest.geoip; -import com.maxmind.geoip2.DatabaseReader; import com.maxmind.geoip2.exception.AddressNotFoundException; import com.maxmind.geoip2.model.AsnResponse; import com.maxmind.geoip2.model.CityResponse; From c0870a389baf92616e2acd31dcc7d7fcb0ae97c7 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 18 Dec 2018 10:47:42 -0500 Subject: [PATCH 23/25] Missing newline --- .../ingest/geoip/GeoIpProcessorNonIngestNodeIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorNonIngestNodeIT.java b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorNonIngestNodeIT.java index 0825de648c802..d74838076b8f5 100644 --- a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorNonIngestNodeIT.java +++ b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorNonIngestNodeIT.java @@ -164,4 +164,4 @@ private void assertDatabaseLoadStatus(final String node, final boolean loaded) { } } -} \ No newline at end of file +} From 5a0ee8fbf54549f6bc75e1903326eb5c4ea8f032 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 18 Dec 2018 11:19:35 -0500 Subject: [PATCH 24/25] Add exception handling --- .../elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java index d8cf65c06ab2f..34e3b456b3e17 100644 --- a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java +++ b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java @@ -107,6 +107,10 @@ final String getDatabaseType() throws IOException { } } + if (metadataOffset == -1) { + throw new IOException("database type marker not found"); + } + // read the database type final int offsetByte = tail[metadataOffset] & 0xFF; final int type = offsetByte >>> 5; From 3a131b1ee4203fc78729f0ed028985066f93757b Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 18 Dec 2018 13:28:01 -0500 Subject: [PATCH 25/25] Fix test name --- ...nIngestNodeIT.java => GeoIpProcessorNonIngestNodeTests.java} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/{GeoIpProcessorNonIngestNodeIT.java => GeoIpProcessorNonIngestNodeTests.java} (99%) diff --git a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorNonIngestNodeIT.java b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorNonIngestNodeTests.java similarity index 99% rename from plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorNonIngestNodeIT.java rename to plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorNonIngestNodeTests.java index d74838076b8f5..92fceab9eeb4c 100644 --- a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorNonIngestNodeIT.java +++ b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorNonIngestNodeTests.java @@ -45,7 +45,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; -public class GeoIpProcessorNonIngestNodeIT extends ESIntegTestCase { +public class GeoIpProcessorNonIngestNodeTests extends ESIntegTestCase { @Override protected Collection> nodePlugins() {