diff --git a/docs/build.gradle b/docs/build.gradle index 5ecaf7dc6faff..b4e9b052c50d0 100644 --- a/docs/build.gradle +++ b/docs/build.gradle @@ -63,8 +63,6 @@ testClusters.matching { it.name == "integTest"}.configureEach { if (singleNode().testDistribution == DEFAULT) { setting 'xpack.license.self_generated.type', 'trial' setting 'indices.lifecycle.history_index_enabled', 'false' - setting 'ingest.geoip.downloader.enabled', 'false' - systemProperty 'es.geoip_v2_feature_flag_enabled', 'true' keystorePassword 'keystore-password' } @@ -89,6 +87,9 @@ testClusters.matching { it.name == "integTest"}.configureEach { // Whitelist reindexing from the local node so we can test it. setting 'reindex.remote.whitelist', '127.0.0.1:*' + extraConfigFile 'ingest-geoip/GeoLite2-City.mmdb', file("${project.projectDir}/src/test/resources/GeoLite2-City.mmdb") + extraConfigFile 'ingest-geoip/GeoLite2-Country.mmdb', file("${project.projectDir}/src/test/resources/GeoLite2-Country.mmdb") + // TODO: remove this once cname is prepended to transport.publish_address by default in 8.0 systemProperty 'es.transport.cname_in_publish_address', 'true' @@ -114,6 +115,10 @@ tasks.named("integTest").configure { } } +tasks.named("forbiddenPatterns").configure { + exclude '**/*.mmdb' +} + tasks.named("buildRestTests").configure { docs = docsFileTree } diff --git a/docs/reference/ingest/common-log-format-example.asciidoc b/docs/reference/ingest/common-log-format-example.asciidoc index ce6b4a5e734d9..09086d52e3784 100644 --- a/docs/reference/ingest/common-log-format-example.asciidoc +++ b/docs/reference/ingest/common-log-format-example.asciidoc @@ -175,7 +175,7 @@ PUT _index_template/my-data-stream-template ---- POST my-data-stream/_doc?pipeline=my-pipeline { - "message": "212.87.37.154 - - [05/May/2099:16:21:15 +0000] \"GET /favicon.ico HTTP/1.1\" 200 3638 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36\"" + "message": "89.160.20.128 - - [05/May/2099:16:21:15 +0000] \"GET /favicon.ico HTTP/1.1\" 200 3638 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36\"" } ---- // TEST[s/my-pipeline/my-pipeline&refresh=wait_for/] @@ -216,21 +216,21 @@ The API returns: "version": "1.1" }, "source": { - "ip": "212.87.37.154", + "ip": "89.160.20.128", "geo": { - "continent_name": "Europe", - "region_iso_code": "DE-BE", - "city_name": "Berlin", - "country_iso_code": "DE", - "country_name": "Germany", - "region_name": "Land Berlin", - "location": { - "lon": 13.4978, - "lat": 52.411 + "continent_name" : "Europe", + "country_name" : "Sweden", + "country_iso_code" : "SE", + "city_name" : "Linköping", + "region_iso_code" : "SE-E", + "region_name" : "Östergötland County", + "location" : { + "lon" : 15.6167, + "lat" : 58.4167 } } }, - "message": "212.87.37.154 - - [05/May/2099:16:21:15 +0000] \"GET /favicon.ico HTTP/1.1\" 200 3638 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36\"", + "message": "89.160.20.128 - - [05/May/2099:16:21:15 +0000] \"GET /favicon.ico HTTP/1.1\" 200 3638 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36\"", "url": { "original": "/favicon.ico" }, diff --git a/docs/reference/ingest/processors/geoip.asciidoc b/docs/reference/ingest/processors/geoip.asciidoc index 936286ee5b2fc..7ee81e4148a75 100644 --- a/docs/reference/ingest/processors/geoip.asciidoc +++ b/docs/reference/ingest/processors/geoip.asciidoc @@ -69,7 +69,7 @@ PUT _ingest/pipeline/geoip } PUT my-index-000001/_doc/my_id?pipeline=geoip { - "ip": "8.8.8.8" + "ip": "89.160.20.128" } GET my-index-000001/_doc/my_id -------------------------------------------------- @@ -86,12 +86,15 @@ Which returns: "_seq_no": 55, "_primary_term": 1, "_source": { - "ip": "8.8.8.8", + "ip": "89.160.20.128", "geoip": { - "continent_name": "North America", - "country_name": "United States", - "country_iso_code": "US", - "location": { "lat": 37.751, "lon": -97.822 } + "continent_name": "Europe", + "country_name": "Sweden", + "country_iso_code": "SE", + "city_name" : "Linköping", + "region_iso_code" : "SE-E", + "region_name" : "Östergötland County", + "location": { "lat": 58.4167, "lon": 15.6167 } } } } @@ -119,7 +122,7 @@ PUT _ingest/pipeline/geoip } PUT my-index-000001/_doc/my_id?pipeline=geoip { - "ip": "8.8.8.8" + "ip": "89.160.20.128" } GET my-index-000001/_doc/my_id -------------------------------------------------- @@ -136,11 +139,11 @@ returns this: "_seq_no": 65, "_primary_term": 1, "_source": { - "ip": "8.8.8.8", + "ip": "89.160.20.128", "geo": { - "continent_name": "North America", - "country_name": "United States", - "country_iso_code": "US" + "continent_name": "Europe", + "country_name": "Sweden", + "country_iso_code": "SE" } } } @@ -236,7 +239,7 @@ PUT _ingest/pipeline/geoip PUT my_ip_locations/_doc/1?refresh=true&pipeline=geoip { - "ip": "8.8.8.8" + "ip": "89.160.20.128" } GET /my_ip_locations/_search @@ -250,8 +253,8 @@ GET /my_ip_locations/_search "geo_distance": { "distance": "1m", "geoip.location": { - "lon": -97.822, - "lat": 37.751 + "lon": 15.6167, + "lat": 58.4167 } } } @@ -285,15 +288,18 @@ GET /my_ip_locations/_search "_score" : 1.0, "_source" : { "geoip" : { - "continent_name" : "North America", - "country_name" : "United States", - "country_iso_code" : "US", + "continent_name" : "Europe", + "country_name" : "Sweden", + "country_iso_code" : "SE", + "city_name" : "Linköping", + "region_iso_code" : "SE-E", + "region_name" : "Östergötland County", "location" : { - "lon" : -97.822, - "lat" : 37.751 + "lon" : 15.6167, + "lat" : 58.4167 } }, - "ip" : "8.8.8.8" + "ip" : "89.160.20.128" } } ] diff --git a/docs/reference/migration/migrate_8_0/ingest.asciidoc b/docs/reference/migration/migrate_8_0/ingest.asciidoc index d827a15de92ed..b9308988b870d 100644 --- a/docs/reference/migration/migrate_8_0/ingest.asciidoc +++ b/docs/reference/migration/migrate_8_0/ingest.asciidoc @@ -18,4 +18,32 @@ Common Schema (ECS)] fields, regardless of the `ecs` value. To avoid deprecation warnings, remove the parameter from your ingest pipelines. If a pipeline specifies an `ecs` value, the value is ignored. ==== + +.The default Maxmind geoip databases have been removed. +[%collapsible] +==== +*Details* + +The default Maxmind geoip databases that shipped by default with Elasticsearch +have been removed. These databases are out dated and stale and using these +databases will likely result in incorrect geoip lookups. + +By default since 7.13, these pre-packaged geoip databases +were used in case no database were specified in the config directory or before +the geoip downloader downloaded the geoip databases. When the geoip database +downloader completed downloading the new databases then these pre-packaged +databases were no longer used. + +*Impact* + +If the geoip downloader is disabled and no geoip databases are provided +in the config directory of each ingest node then the geoip processor will +no longer perform geoip lookups and tag these documents with the fact that +the requested database is no longer available. + +After a cluster has been started and before the geoip downloader has completed +downloading the most up to data databases, the geoip processor will not perform +any geoip lookups and tag documents that the requested database is not available. +After the geoip downloader has completed downloading the most up to data databases +then the geoip processor will function as normal. The window of time that the +geoip processor can't do geoip lookups after cluster startup should be very small. +==== //end::notable-breaking-changes[] diff --git a/docs/src/test/resources/GeoLite2-City.mmdb b/docs/src/test/resources/GeoLite2-City.mmdb new file mode 100644 index 0000000000000..0809201619b59 Binary files /dev/null and b/docs/src/test/resources/GeoLite2-City.mmdb differ diff --git a/docs/src/test/resources/GeoLite2-Country.mmdb b/docs/src/test/resources/GeoLite2-Country.mmdb new file mode 100644 index 0000000000000..aa81cbe8a2f0e Binary files /dev/null and b/docs/src/test/resources/GeoLite2-Country.mmdb differ diff --git a/modules/ingest-geoip/build.gradle b/modules/ingest-geoip/build.gradle index b2d8689e5c2e6..940fec7cff471 100644 --- a/modules/ingest-geoip/build.gradle +++ b/modules/ingest-geoip/build.gradle @@ -13,7 +13,7 @@ apply plugin: 'elasticsearch.yaml-rest-compat-test' apply plugin: 'elasticsearch.internal-cluster-test' esplugin { - description 'Ingest processor that uses lookup geo data based on IP adresses using the MaxMind geo database' + description 'Ingest processor that uses lookup geo data based on IP addresses using the MaxMind geo database' classname 'org.elasticsearch.ingest.geoip.IngestGeoIpPlugin' } @@ -57,14 +57,7 @@ tasks.named("internalClusterTest").configure { } } -tasks.register("copyDefaultGeoIp2DatabaseFiles", Copy) { - from { zipTree(configurations.testCompileClasspath.files.find { it.name.contains('geolite2-databases') }) } - into "${project.buildDir}/ingest-geoip" - include "*.mmdb" -} - tasks.named("bundlePlugin").configure { - dependsOn("copyDefaultGeoIp2DatabaseFiles") from("${project.buildDir}/ingest-geoip") { into '/' } @@ -107,3 +100,18 @@ tasks.named("dependencyLicenses").configure { mapping from: /maxmind-db.*/, to: 'maxmind-db-reader' ignoreFile 'elastic-geoip-database-service-agreement-LICENSE.txt' } + +testClusters.configureEach { + // Needed for database downloader, uses delete-by-query to cleanup old databases from org.elasticsearch.ingest.geoip database system index + module ':modules:reindex' + // Downloader is enabled by default, but in test clusters in build disabled by default, + // but in this module, the downloader should be enabled by default + systemProperty 'ingest.geoip.downloader.enabled.default', 'true' + if (useFixture) { + setting 'ingest.geoip.downloader.endpoint', { "${-> fixtureAddress()}" } + } +} + +tasks.named("yamlRestTestV7CompatTransform").configure { task -> + task.skipTestsByFilePattern("**/ingest_geoip/20_geoip_processor.yml", "from 8.0 yaml rest tests use geoip test fixture and default geoip are no longer packaged. In 7.x yaml tests used default databases which makes tests results very different, so skipping these tests") +} diff --git a/modules/ingest-geoip/qa/file-based-update/src/test/java/org/elasticsearch/ingest/geoip/UpdateDatabasesIT.java b/modules/ingest-geoip/qa/file-based-update/src/test/java/org/elasticsearch/ingest/geoip/UpdateDatabasesIT.java index fd907738e5b28..24fda3ed4021b 100644 --- a/modules/ingest-geoip/qa/file-based-update/src/test/java/org/elasticsearch/ingest/geoip/UpdateDatabasesIT.java +++ b/modules/ingest-geoip/qa/file-based-update/src/test/java/org/elasticsearch/ingest/geoip/UpdateDatabasesIT.java @@ -7,25 +7,25 @@ */ package org.elasticsearch.ingest.geoip; -import org.apache.http.util.EntityUtils; import org.elasticsearch.client.Request; -import org.elasticsearch.client.Response; -import org.elasticsearch.core.PathUtils; import org.elasticsearch.common.settings.SecureString; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.core.PathUtils; import org.elasticsearch.xcontent.ObjectPath; -import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.xcontent.json.JsonXContent; import org.elasticsearch.test.rest.ESRestTestCase; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.List; import java.util.Map; +import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; public class UpdateDatabasesIT extends ESRestTestCase { @@ -35,8 +35,14 @@ public void test() throws Exception { Request simulatePipelineRequest = new Request("POST", "/_ingest/pipeline/_simulate"); simulatePipelineRequest.setJsonEntity(body); { - Map response = toMap(client().performRequest(simulatePipelineRequest)); - assertThat(ObjectPath.eval("docs.0.doc._source.geoip.city_name", response), equalTo("Tumba")); + Map response = entityAsMap(client().performRequest(simulatePipelineRequest)); + assertThat(ObjectPath.eval("docs.0.doc._source.tags.0", response), equalTo("_geoip_database_unavailable_GeoLite2-City.mmdb")); + } + + // Ensure no config databases have been setup: + { + Map stats = getGeoIpStatsForSingleNode(); + assertThat(stats, nullValue()); } Path configPath = PathUtils.get(System.getProperty("tests.config.dir")); @@ -46,14 +52,25 @@ public void test() throws Exception { Files.copy(UpdateDatabasesIT.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"), ingestGeoipDatabaseDir.resolve("GeoLite2-City.mmdb")); - assertBusy(() -> { - Map response = toMap(client().performRequest(simulatePipelineRequest)); - assertThat(ObjectPath.eval("docs.0.doc._source.geoip.city_name", response), equalTo("Linköping")); - }); + // Ensure that a config database has been setup: + { + assertBusy(() -> { + Map stats = getGeoIpStatsForSingleNode(); + assertThat(stats, notNullValue()); + assertThat(stats.get("config_databases"), equalTo(List.of("GeoLite2-City.mmdb"))); + }); + } + + Map response = entityAsMap(client().performRequest(simulatePipelineRequest)); + assertThat(ObjectPath.eval("docs.0.doc._source.geoip.city_name", response), equalTo("Linköping")); } - private static Map toMap(Response response) throws IOException { - return XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(response.getEntity()), false); + private static Map getGeoIpStatsForSingleNode() throws IOException { + Request request = new Request("GET", "/_ingest/geoip/stats"); + Map response = entityAsMap(client().performRequest(request)); + Map nodes = (Map) response.get("nodes"); + assertThat(nodes.size(), either(equalTo(0)).or(equalTo(1))); + return nodes.isEmpty() ? null : (Map) nodes.values().iterator().next(); } @Override diff --git a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java index da312e2d73df3..1d02a5b7f22fa 100644 --- a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java +++ b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java @@ -28,6 +28,7 @@ import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.MatchQueryBuilder; import org.elasticsearch.index.query.RangeQueryBuilder; +import org.elasticsearch.ingest.geoip.stats.GeoIpDownloaderStatsAction; import org.elasticsearch.reindex.ReindexPlugin; import org.elasticsearch.persistent.PersistentTaskParams; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; @@ -40,6 +41,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.UncheckedIOException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardCopyOption; @@ -48,10 +50,12 @@ import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; +import java.util.stream.StreamSupport; import java.util.zip.GZIPInputStream; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -60,10 +64,13 @@ import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasItems; +import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; public class GeoIpDownloaderIT extends AbstractGeoIpIT { @@ -259,6 +266,7 @@ public void testGeoIpDatabasesDownload() throws Exception { @TestLogging(value = "org.elasticsearch.ingest.geoip:TRACE", reason = "https://github.com/elastic/elasticsearch/issues/69972") public void testUseGeoIpProcessorWithDownloadedDBs() throws Exception { assumeTrue("only test with fixture to have stable results", ENDPOINT != null); + setupDatabasesInConfigDirectory(); // setup: putPipeline(); @@ -303,6 +311,41 @@ public void testUseGeoIpProcessorWithDownloadedDBs() throws Exception { }); } + public void testStartWithNoDatabases() throws Exception { + assumeTrue("only test with fixture to have stable results", ENDPOINT != null); + putPipeline(); + + // Behaviour without any databases loaded: + { + SimulateDocumentBaseResult result = simulatePipeline(); + assertThat(result.getFailure(), nullValue()); + assertThat(result.getIngestDocument(), notNullValue()); + Map source = result.getIngestDocument().getSourceAndMetadata(); + assertThat(source, hasEntry("tags", List.of("_geoip_database_unavailable_GeoLite2-City.mmdb", + "_geoip_database_unavailable_GeoLite2-Country.mmdb", "_geoip_database_unavailable_GeoLite2-ASN.mmdb"))); + } + + // Enable downloader: + Settings.Builder settings = Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), true); + assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(settings)); + { + assertBusy(() -> { + SimulateDocumentBaseResult result = simulatePipeline(); + assertThat(result.getFailure(), nullValue()); + assertThat(result.getIngestDocument(), notNullValue()); + Map source = result.getIngestDocument().getSourceAndMetadata(); + assertThat(source, not(hasKey("tags"))); + assertThat(source, hasKey("ip-city")); + assertThat(source, hasKey("ip-asn")); + assertThat(source, hasKey("ip-country")); + + assertThat(((Map) source.get("ip-city")).get("city_name"), equalTo("Linköping")); + assertThat(((Map) source.get("ip-asn")).get("organization_name"), equalTo("Bredband2 AB")); + assertThat(((Map) source.get("ip-country")).get("country_name"), equalTo("Sweden")); + }); + } + } + private void verifyUpdatedDatabase() throws Exception { assertBusy(() -> { SimulateDocumentBaseResult result = simulatePipeline(); @@ -412,6 +455,36 @@ private List getGeoIpTmpDirs() throws IOException { return geoipTmpDirs; } + private void setupDatabasesInConfigDirectory() throws Exception { + StreamSupport.stream(internalCluster().getInstances(Environment.class).spliterator(), false) + .map(Environment::configFile) + .map(path -> path.resolve("ingest-geoip")) + .distinct() + .forEach(path -> { + try { + Files.createDirectories(path); + Files.copy(GeoIpDownloaderIT.class.getResourceAsStream("/GeoLite2-City.mmdb"), + path.resolve("GeoLite2-City.mmdb")); + Files.copy(GeoIpDownloaderIT.class.getResourceAsStream("/GeoLite2-ASN.mmdb"), + path.resolve("GeoLite2-ASN.mmdb")); + Files.copy(GeoIpDownloaderIT.class.getResourceAsStream("/GeoLite2-Country.mmdb"), + path.resolve("GeoLite2-Country.mmdb")); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + + assertBusy(() -> { + GeoIpDownloaderStatsAction.Response response = + client().execute(GeoIpDownloaderStatsAction.INSTANCE, new GeoIpDownloaderStatsAction.Request()).actionGet(); + assertThat(response.getNodes(), not(empty())); + for (GeoIpDownloaderStatsAction.NodeResponse nodeResponse : response.getNodes()) { + assertThat(nodeResponse.getConfigDatabases(), + containsInAnyOrder("GeoLite2-Country.mmdb", "GeoLite2-City.mmdb", "GeoLite2-ASN.mmdb")); + } + }); + } + @SuppressForbidden(reason = "Maxmind API requires java.io.File") private void parseDatabase(Path tempFile) throws IOException { try (DatabaseReader databaseReader = new DatabaseReader.Builder(tempFile.toFile()).build()) { diff --git a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/ReloadingDatabasesWhilePerformingGeoLookupsIT.java b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/ReloadingDatabasesWhilePerformingGeoLookupsIT.java index afa40c241fd64..d8e184b407446 100644 --- a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/ReloadingDatabasesWhilePerformingGeoLookupsIT.java +++ b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/ReloadingDatabasesWhilePerformingGeoLookupsIT.java @@ -56,10 +56,9 @@ public class ReloadingDatabasesWhilePerformingGeoLookupsIT extends ESTestCase { * geoip processor instance is using the related {@link DatabaseReaderLazyLoader} instance */ public void test() throws Exception { - Path geoIpModulesDir = createTempDir(); Path geoIpConfigDir = createTempDir(); Path geoIpTmpDir = createTempDir(); - DatabaseRegistry databaseRegistry = createRegistry(geoIpModulesDir, geoIpConfigDir, geoIpTmpDir); + DatabaseRegistry databaseRegistry = createRegistry(geoIpConfigDir, geoIpTmpDir); ClusterService clusterService = mock(ClusterService.class); when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE); GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService); @@ -71,8 +70,8 @@ public void test() throws Exception { databaseRegistry.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb")); lazyLoadReaders(databaseRegistry); - final GeoIpProcessor processor1 = factory.create(null, "_tag", null, new HashMap<>(Map.of("field", "_field"))); - final GeoIpProcessor processor2 = factory.create(null, "_tag", null, + final GeoIpProcessor processor1 = (GeoIpProcessor) factory.create(null, "_tag", null, new HashMap<>(Map.of("field", "_field"))); + final GeoIpProcessor processor2 = (GeoIpProcessor) factory.create(null, "_tag", null, new HashMap<>(Map.of("field", "_field", "database_file", "GeoLite2-City-Test.mmdb"))); final AtomicBoolean completed = new AtomicBoolean(false); @@ -164,13 +163,13 @@ public void test() throws Exception { assertThat(lazyLoader.current(), equalTo(0)); } // Avoid accumulating many temp dirs while running with -Dtests.iters=X - IOUtils.rm(geoIpModulesDir, geoIpConfigDir, geoIpTmpDir); + IOUtils.rm(geoIpConfigDir, geoIpTmpDir); } - private static DatabaseRegistry createRegistry(Path geoIpModulesDir, Path geoIpConfigDir, Path geoIpTmpDir) throws IOException { - copyDatabaseFiles(geoIpModulesDir); + private static DatabaseRegistry createRegistry(Path geoIpConfigDir, Path geoIpTmpDir) throws IOException { GeoIpCache cache = new GeoIpCache(0); - LocalDatabases localDatabases = new LocalDatabases(geoIpModulesDir, geoIpConfigDir, cache); + LocalDatabases localDatabases = new LocalDatabases(geoIpConfigDir, cache); + copyDatabaseFiles(geoIpConfigDir, localDatabases); DatabaseRegistry databaseRegistry = new DatabaseRegistry(geoIpTmpDir, mock(Client.class), cache, localDatabases, Runnable::run); databaseRegistry.initialize("nodeId", mock(ResourceWatcherService.class), mock(IngestService.class)); diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseRegistry.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseRegistry.java index d6ecd3ffced1e..6439dc008aa8f 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseRegistry.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseRegistry.java @@ -54,6 +54,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.function.Consumer; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.zip.GZIPInputStream; @@ -87,6 +88,7 @@ public final class DatabaseRegistry implements Closeable { private Path geoipTmpDirectory; private final LocalDatabases localDatabases; private final Consumer genericExecutor; + private IngestService ingestService; private final ConcurrentMap databases = new ConcurrentHashMap<>(); @@ -150,14 +152,15 @@ public FileVisitResult postVisitDirectory(Path dir, IOException exc) { } LOGGER.info("initialized database registry, using geoip-databases directory [{}]", geoipTmpDirectory); ingestService.addIngestClusterStateListener(this::checkDatabases); + this.ingestService = ingestService; } - public DatabaseReaderLazyLoader getDatabase(String name, boolean fallbackUsingDefaultDatabases) { + public DatabaseReaderLazyLoader getDatabase(String name) { // There is a need for reference counting in order to avoid using an instance // that gets closed while using it. (this can happen during a database update) while (true) { DatabaseReaderLazyLoader instance = - databases.getOrDefault(name, localDatabases.getDatabase(name, fallbackUsingDefaultDatabases)); + databases.getOrDefault(name, localDatabases.getDatabase(name)); if (instance == null || instance.preLookup()) { return instance; } @@ -167,7 +170,7 @@ public DatabaseReaderLazyLoader getDatabase(String name, boolean fallbackUsingDe } List getAllDatabases() { - List all = new ArrayList<>(localDatabases.getAllDatabases()); + List all = new ArrayList<>(localDatabases.getConfigDatabases().values()); this.databases.forEach((key, value) -> all.add(value)); return all; } @@ -313,6 +316,22 @@ void updateDatabase(String databaseFileName, String recordedMd5, Path file) { DatabaseReaderLazyLoader existing = databases.put(databaseFileName, loader); if (existing != null) { existing.close(); + } else { + // Loaded a database for the first time, so reload pipelines for which a database was not available: + Predicate predicate = p -> databaseFileName.equals(p.getDatabaseName()); + var ids = ingestService.getPipelineWithProcessorType(GeoIpProcessor.DatabaseUnavailableProcessor.class, predicate); + if (ids.isEmpty() == false) { + for (var id : ids) { + try { + ingestService.reloadPipeline(id); + LOGGER.debug("successfully reloaded pipeline [{}] after downloading of database [{}] for the first time", + id, databaseFileName); + } catch (Exception e) { + LOGGER.debug((Supplier) () -> new ParameterizedMessage( + "failed to reload pipeline [{}] after downloading of database [{}]", id, databaseFileName), e); + } + } + } } LOGGER.info("successfully reloaded changed geoip database file [{}]", file); } catch (Exception e) { @@ -384,6 +403,10 @@ public Set getAvailableDatabases() { return Set.copyOf(databases.keySet()); } + public Set getConfigDatabases() { + return localDatabases.getConfigDatabases().keySet(); + } + public Set getFilesInTemp() { try (Stream files = Files.list(geoipTmpDirectory)) { return files.map(Path::getFileName).map(Path::toString).collect(Collectors.toSet()); diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java index 5ca1efe361b9f..d1d9cd788d2da 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java @@ -23,6 +23,8 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.CheckedSupplier; +import org.elasticsearch.common.logging.DeprecationCategory; +import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.logging.HeaderWarning; import org.elasticsearch.common.network.InetAddresses; import org.elasticsearch.common.network.NetworkAddress; @@ -52,6 +54,10 @@ public final class GeoIpProcessor extends AbstractProcessor { + private static final DeprecationLogger DEPRECATION_LOGGER = DeprecationLogger.getLogger(GeoIpProcessor.class); + static final String DEFAULT_DATABASES_DEPRECATION_MESSAGE = "the [fallback_to_default_databases] has been deprecated," + + " because Elasticsearch no longer includes the default Maxmind geoip databases. This setting will be removed in Elasticsearch 9.0"; + public static final String TYPE = "geoip"; private static final String CITY_DB_SUFFIX = "-City"; private static final String COUNTRY_DB_SUFFIX = "-Country"; @@ -369,7 +375,7 @@ public Factory(DatabaseRegistry databaseRegistry, ClusterService clusterService) } @Override - public GeoIpProcessor create( + public Processor create( final Map registry, final String processorTag, final String description, final Map config) throws IOException { @@ -379,12 +385,19 @@ public GeoIpProcessor create( List propertyNames = readOptionalList(TYPE, processorTag, config, "properties"); boolean ignoreMissing = readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false); boolean firstOnly = readBooleanProperty(TYPE, processorTag, config, "first_only", true); - boolean fallbackUsingDefaultDatabases = readBooleanProperty(TYPE, processorTag, config, "fallback_to_default_databases", true); - DatabaseReaderLazyLoader lazyLoader = databaseRegistry.getDatabase(databaseFile, fallbackUsingDefaultDatabases); - if (lazyLoader == null) { + // noop, should be removed in 9.0 + Object value = config.remove("fallback_to_default_databases"); + if (value != null) { + DEPRECATION_LOGGER.critical(DeprecationCategory.OTHER, "default_databases_message", DEFAULT_DATABASES_DEPRECATION_MESSAGE); + } + + DatabaseReaderLazyLoader lazyLoader = databaseRegistry.getDatabase(databaseFile); + if (lazyLoader == null && databaseRegistry.getAvailableDatabases().isEmpty() == false) { throw newConfigurationException(TYPE, processorTag, "database_file", "database file [" + databaseFile + "] doesn't exist"); + } else if (lazyLoader == null && databaseRegistry.getAvailableDatabases().isEmpty()) { + return new DatabaseUnavailableProcessor(processorTag, description, databaseFile); } final String databaseType; try { @@ -417,7 +430,7 @@ public GeoIpProcessor create( } } CheckedSupplier supplier = () -> { - DatabaseReaderLazyLoader loader = databaseRegistry.getDatabase(databaseFile, fallbackUsingDefaultDatabases); + DatabaseReaderLazyLoader loader = databaseRegistry.getDatabase(databaseFile); if (loader == null) { throw new ResourceNotFoundException("database file [" + databaseFile + "] doesn't exist"); } @@ -518,4 +531,29 @@ public static Property parseProperty(String databaseType, String value) { } } } + + static class DatabaseUnavailableProcessor extends AbstractProcessor { + + private final String databaseName; + + DatabaseUnavailableProcessor(String tag, String description, String databaseName) { + super(tag, description); + this.databaseName = databaseName; + } + + @Override + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + ingestDocument.appendFieldValue("tags", "_geoip_database_unavailable_" + databaseName, true); + return ingestDocument; + } + + @Override + public String getType() { + return TYPE; + } + + public String getDatabaseName() { + return databaseName; + } + } } diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/LocalDatabases.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/LocalDatabases.java index 99b557a6de3be..60958cc8a65af 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/LocalDatabases.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/LocalDatabases.java @@ -11,8 +11,6 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; -import org.elasticsearch.core.SuppressForbidden; -import org.elasticsearch.core.PathUtils; import org.elasticsearch.env.Environment; import org.elasticsearch.watcher.FileChangesListener; import org.elasticsearch.watcher.FileWatcher; @@ -23,23 +21,17 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.PathMatcher; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.stream.Stream; -import static org.elasticsearch.ingest.geoip.IngestGeoIpPlugin.DEFAULT_DATABASE_FILENAMES; - /** - * Keeps track of the databases locally available to a node: - * 1) Default databases shipped with the default distribution via ingest-geoip module - * 2) User provided databases from the ES_HOME/config/ingest-geoip directory. This directory is monitored - * and files updates are picked up and may cause databases being loaded or removed at runtime. + * Keeps track of user provided databases in the ES_HOME/config/ingest-geoip directory. + * This directory is monitored and files updates are picked up and may cause databases being loaded or removed at runtime. */ final class LocalDatabases implements Closeable { @@ -48,28 +40,16 @@ final class LocalDatabases implements Closeable { private final GeoIpCache cache; private final Path geoipConfigDir; - private final Map defaultDatabases; private final ConcurrentMap configDatabases; LocalDatabases(Environment environment, GeoIpCache cache) { - this( - // In GeoIpProcessorNonIngestNodeTests, ingest-geoip is loaded on the classpath. - // This means that the plugin is never unbundled into a directory where the database files would live. - // Therefore, we have to copy these database files ourselves. To do this, we need the ability to specify where - // those database files would go. We do this by adding a plugin that registers ingest.geoip.database_path as an - // actual setting. Otherwise, in production code, this setting is not registered and the database path is not configurable. - environment.settings().get("ingest.geoip.database_path") != null ? - getGeoipConfigDirectory(environment) : - environment.modulesFile().resolve("ingest-geoip"), - environment.configFile().resolve("ingest-geoip"), - cache); + this(environment.configFile().resolve("ingest-geoip"), cache); } - LocalDatabases(Path geoipModuleDir, Path geoipConfigDir, GeoIpCache cache) { + LocalDatabases(Path geoipConfigDir, GeoIpCache cache) { this.cache = cache; this.geoipConfigDir = geoipConfigDir; this.configDatabases = new ConcurrentHashMap<>(); - this.defaultDatabases = initDefaultDatabases(geoipModuleDir); } void initialize(ResourceWatcherService resourceWatcher) throws IOException { @@ -79,22 +59,11 @@ void initialize(ResourceWatcherService resourceWatcher) throws IOException { watcher.addListener(new GeoipDirectoryListener()); resourceWatcher.add(watcher, ResourceWatcherService.Frequency.HIGH); - LOGGER.info("initialized default databases [{}], config databases [{}] and watching [{}] for changes", - defaultDatabases.keySet(), configDatabases.keySet(), geoipConfigDir); - } - - DatabaseReaderLazyLoader getDatabase(String name, boolean fallbackUsingDefaultDatabases) { - return configDatabases.getOrDefault(name, fallbackUsingDefaultDatabases ? defaultDatabases.get(name) : null); - } - - List getAllDatabases() { - List all = new ArrayList<>(defaultDatabases.values()); - all.addAll(configDatabases.values()); - return all; + LOGGER.info("initialized config databases [{}] and watching [{}] for changes", configDatabases.keySet(), geoipConfigDir); } - Map getDefaultDatabases() { - return defaultDatabases; + DatabaseReaderLazyLoader getDatabase(String name) { + return configDatabases.get(name); } Map getConfigDatabases() { @@ -122,20 +91,6 @@ void updateDatabase(Path file, boolean update) { } } - Map initDefaultDatabases(Path geoipModuleDir) { - Map databases = new HashMap<>(DEFAULT_DATABASE_FILENAMES.length); - - for (String filename : DEFAULT_DATABASE_FILENAMES) { - Path source = geoipModuleDir.resolve(filename); - assert Files.exists(source); - String databaseFileName = source.getFileName().toString(); - DatabaseReaderLazyLoader loader = new DatabaseReaderLazyLoader(cache, source, null); - databases.put(databaseFileName, loader); - } - - return Collections.unmodifiableMap(databases); - } - Map initConfigDatabases(Path geoipConfigDir) throws IOException { Map databases = new HashMap<>(); @@ -161,19 +116,11 @@ Map initConfigDatabases(Path geoipConfigDir) t @Override public void close() throws IOException { - for (DatabaseReaderLazyLoader lazyLoader : defaultDatabases.values()) { - lazyLoader.close(); - } for (DatabaseReaderLazyLoader lazyLoader : configDatabases.values()) { lazyLoader.close(); } } - @SuppressForbidden(reason = "PathUtils#get") - private static Path getGeoipConfigDirectory(Environment environment) { - return PathUtils.get(environment.settings().get("ingest.geoip.database_path")); - } - private class GeoipDirectoryListener implements FileChangesListener { @Override diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStatsAction.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStatsAction.java index 6cf73333f2219..72790c9478ba8 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStatsAction.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStatsAction.java @@ -8,6 +8,7 @@ package org.elasticsearch.ingest.geoip.stats; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.support.nodes.BaseNodeResponse; @@ -91,6 +92,10 @@ public Response(ClusterName clusterName, List nodes, List n.stats).filter(Objects::nonNull).findFirst().orElse(GeoIpDownloaderStats.EMPTY); + } + @Override protected List readNodesFrom(StreamInput in) throws IOException { return in.readList(NodeResponse::new); @@ -103,14 +108,13 @@ protected void writeNodesTo(StreamOutput out, List nodes) throws I @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - GeoIpDownloaderStats stats = - getNodes().stream().map(n -> n.stats).filter(Objects::nonNull).findFirst().orElse(GeoIpDownloaderStats.EMPTY); + GeoIpDownloaderStats stats = getStats(); builder.startObject(); builder.field("stats", stats); builder.startObject("nodes"); for (Map.Entry e : getNodesMap().entrySet()) { NodeResponse response = e.getValue(); - if (response.filesInTemp.isEmpty() && response.databases.isEmpty()) { + if (response.filesInTemp.isEmpty() && response.databases.isEmpty() && response.configDatabases.isEmpty()) { continue; } builder.startObject(e.getKey()); @@ -126,6 +130,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (response.filesInTemp.isEmpty() == false) { builder.array("files_in_temp", response.filesInTemp.toArray(String[]::new)); } + if (response.configDatabases.isEmpty() == false) { + builder.array("config_databases", response.configDatabases.toArray(String[]::new)); + } builder.endObject(); } builder.endObject(); @@ -152,19 +159,39 @@ public static class NodeResponse extends BaseNodeResponse { private final GeoIpDownloaderStats stats; private final Set databases; private final Set filesInTemp; + private final Set configDatabases; protected NodeResponse(StreamInput in) throws IOException { super(in); stats = in.readBoolean() ? new GeoIpDownloaderStats(in) : null; databases = in.readSet(StreamInput::readString); filesInTemp = in.readSet(StreamInput::readString); + configDatabases = in.getVersion().onOrAfter(Version.V_8_0_0) ? in.readSet(StreamInput::readString) : null; } - protected NodeResponse(DiscoveryNode node, GeoIpDownloaderStats stats, Set databases, Set filesInTemp) { + protected NodeResponse(DiscoveryNode node, GeoIpDownloaderStats stats, Set databases, Set filesInTemp, + Set configDatabases) { super(node); this.stats = stats; this.databases = databases; this.filesInTemp = filesInTemp; + this.configDatabases = configDatabases; + } + + public GeoIpDownloaderStats getStats() { + return stats; + } + + public Set getDatabases() { + return databases; + } + + public Set getFilesInTemp() { + return filesInTemp; + } + + public Set getConfigDatabases() { + return configDatabases; } @Override @@ -176,6 +203,9 @@ public void writeTo(StreamOutput out) throws IOException { } out.writeCollection(databases, StreamOutput::writeString); out.writeCollection(filesInTemp, StreamOutput::writeString); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeCollection(configDatabases, StreamOutput::writeString); + } } @Override @@ -183,12 +213,15 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; NodeResponse that = (NodeResponse) o; - return stats.equals(that.stats) && databases.equals(that.databases) && filesInTemp.equals(that.filesInTemp); + return stats.equals(that.stats) && + databases.equals(that.databases) && + filesInTemp.equals(that.filesInTemp) && + Objects.equals(configDatabases, that.configDatabases); } @Override public int hashCode() { - return Objects.hash(stats, databases, filesInTemp); + return Objects.hash(stats, databases, filesInTemp, configDatabases); } } } diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStatsTransportAction.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStatsTransportAction.java index fe73f98b68c93..d39f99aa71512 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStatsTransportAction.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStatsTransportAction.java @@ -66,6 +66,7 @@ protected NodeResponse newNodeResponse(StreamInput in, DiscoveryNode node) throw protected NodeResponse nodeOperation(NodeRequest request, Task task) { GeoIpDownloader geoIpTask = geoIpDownloaderTaskExecutor.getCurrentTask(); GeoIpDownloaderStats stats = geoIpTask == null || geoIpTask.getStatus() == null ? null : geoIpTask.getStatus(); - return new NodeResponse(transportService.getLocalNode(), stats, registry.getAvailableDatabases(), registry.getFilesInTemp()); + return new NodeResponse(transportService.getLocalNode(), stats, registry.getAvailableDatabases(), registry.getFilesInTemp(), + registry.getConfigDatabases()); } } diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/DatabaseRegistryTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/DatabaseRegistryTests.java index 6ed329a54c2b2..908ef873331c7 100644 --- a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/DatabaseRegistryTests.java +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/DatabaseRegistryTests.java @@ -71,6 +71,7 @@ import java.util.UUID; import java.util.function.Consumer; import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.Stream; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; @@ -80,13 +81,18 @@ import static org.elasticsearch.persistent.PersistentTasksCustomMetadata.TYPE; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; @LuceneTestCase.SuppressFileSystems(value = "ExtrasFS") // Don't randomly add 'extra' files to directory. @@ -97,24 +103,25 @@ public class DatabaseRegistryTests extends ESTestCase { private ThreadPool threadPool; private DatabaseRegistry databaseRegistry; private ResourceWatcherService resourceWatcherService; + private IngestService ingestService; @Before public void setup() throws IOException { - final Path geoIpDir = createTempDir(); final Path geoIpConfigDir = createTempDir(); Files.createDirectories(geoIpConfigDir); - copyDatabaseFiles(geoIpDir); + GeoIpCache cache = new GeoIpCache(1000); + LocalDatabases localDatabases = new LocalDatabases(geoIpConfigDir, cache); + copyDatabaseFiles(geoIpConfigDir, localDatabases); threadPool = new TestThreadPool(LocalDatabases.class.getSimpleName()); Settings settings = Settings.builder().put("resource.reload.interval.high", TimeValue.timeValueMillis(100)).build(); resourceWatcherService = new ResourceWatcherService(settings, threadPool); client = mock(Client.class); - GeoIpCache cache = new GeoIpCache(1000); - LocalDatabases localDatabases = new LocalDatabases(geoIpDir, geoIpConfigDir, cache); + ingestService = mock(IngestService.class); geoIpTmpDir = createTempDir(); databaseRegistry = new DatabaseRegistry(geoIpTmpDir, client, cache, localDatabases, Runnable::run); - databaseRegistry.initialize("nodeId", resourceWatcherService, mock(IngestService.class)); + databaseRegistry.initialize("nodeId", resourceWatcherService, ingestService); } @After @@ -139,11 +146,17 @@ public void testCheckDatabases() throws Exception { .routingTable(createIndexRoutingTable()) .build(); - assertThat(databaseRegistry.getDatabase("GeoIP2-City.mmdb", false), nullValue()); + int numPipelinesToBeReloaded = randomInt(4); + List pipelineIds = IntStream.range(0, numPipelinesToBeReloaded).mapToObj(String::valueOf).collect(Collectors.toList()); + when(ingestService.getPipelineWithProcessorType(any(), any())).thenReturn(pipelineIds); + + assertThat(databaseRegistry.getDatabase("GeoIP2-City.mmdb"), nullValue()); + // Nothing should be downloaded, since the database is no longer valid (older than 30 days) databaseRegistry.checkDatabases(state); - DatabaseReaderLazyLoader database = databaseRegistry.getDatabase("GeoIP2-City.mmdb", false); + DatabaseReaderLazyLoader database = databaseRegistry.getDatabase("GeoIP2-City.mmdb"); assertThat(database, nullValue()); verify(client, times(0)).search(any()); + verify(ingestService, times(0)).reloadPipeline(anyString()); try (Stream files = Files.list(geoIpTmpDir.resolve("geoip-databases").resolve("nodeId"))) { assertEquals(0, files.count()); } @@ -159,10 +172,16 @@ public void testCheckDatabases() throws Exception { .localNodeId("_id1")) .routingTable(createIndexRoutingTable()) .build(); + // Database should be downloaded databaseRegistry.checkDatabases(state); - database = databaseRegistry.getDatabase("GeoIP2-City.mmdb", false); + database = databaseRegistry.getDatabase("GeoIP2-City.mmdb"); assertThat(database, notNullValue()); verify(client, times(10)).search(any()); + try (Stream files = Files.list(geoIpTmpDir.resolve("geoip-databases").resolve("nodeId"))) { + assertThat(files.count(), greaterThanOrEqualTo(1L)); + } + // First time GeoIP2-City.mmdb is downloaded, so a pipeline reload can happen: + verify(ingestService, times(numPipelinesToBeReloaded)).reloadPipeline(anyString()); //30 days check passed but we mocked mmdb data so parsing will fail expectThrows(InvalidDatabaseException.class, database::get); } @@ -184,7 +203,7 @@ public void testCheckDatabases_dontCheckDatabaseOnNonIngestNode() throws Excepti .build(); databaseRegistry.checkDatabases(state); - assertThat(databaseRegistry.getDatabase("GeoIP2-City.mmdb", false), nullValue()); + assertThat(databaseRegistry.getDatabase("GeoIP2-City.mmdb"), nullValue()); verify(client, never()).search(any()); try (Stream files = Files.list(geoIpTmpDir.resolve("geoip-databases").resolve("nodeId"))) { assertThat(files.collect(Collectors.toList()), empty()); @@ -206,7 +225,7 @@ public void testCheckDatabases_dontCheckDatabaseWhenNoDatabasesIndex() throws Ex .build(); databaseRegistry.checkDatabases(state); - assertThat(databaseRegistry.getDatabase("GeoIP2-City.mmdb", false), nullValue()); + assertThat(databaseRegistry.getDatabase("GeoIP2-City.mmdb"), nullValue()); verify(client, never()).search(any()); try (Stream files = Files.list(geoIpTmpDir.resolve("geoip-databases").resolve("nodeId"))) { assertThat(files.collect(Collectors.toList()), empty()); @@ -227,7 +246,7 @@ public void testCheckDatabases_dontCheckDatabaseWhenGeoIpDownloadTask() throws E mockSearches("GeoIP2-City.mmdb", 0, 9); databaseRegistry.checkDatabases(state); - assertThat(databaseRegistry.getDatabase("GeoIP2-City.mmdb", false), nullValue()); + assertThat(databaseRegistry.getDatabase("GeoIP2-City.mmdb"), nullValue()); verify(client, never()).search(any()); try (Stream files = Files.list(geoIpTmpDir.resolve("geoip-databases").resolve("nodeId"))) { assertThat(files.collect(Collectors.toList()), empty()); @@ -273,6 +292,25 @@ public void testRetrieveDatabaseCorruption() throws Exception { verify(client, times(10)).search(any()); } + public void testUpdateDatabase() throws Exception { + int numPipelinesToBeReloaded = randomInt(4); + List pipelineIds = IntStream.range(0, numPipelinesToBeReloaded).mapToObj(String::valueOf).collect(Collectors.toList()); + when(ingestService.getPipelineWithProcessorType(any(), any())).thenReturn(pipelineIds); + + databaseRegistry.updateDatabase("_name", "_md5", geoIpTmpDir.resolve("some-file")); + + // Updating the first time may trigger a reload. + verify(ingestService, times(1)).addIngestClusterStateListener(any()); + verify(ingestService, times(1)).getPipelineWithProcessorType(any(), any()); + verify(ingestService, times(numPipelinesToBeReloaded)).reloadPipeline(anyString()); + verifyNoMoreInteractions(ingestService); + reset(ingestService); + + // Subsequent updates shouldn't trigger a reload. + databaseRegistry.updateDatabase("_name", "_md5", geoIpTmpDir.resolve("some-file")); + verifyZeroInteractions(ingestService); + } + private String mockSearches(String databaseName, int firstChunk, int lastChunk) throws IOException { String dummyContent = "test: " + databaseName; List data = gzip(databaseName, dummyContent, lastChunk - firstChunk + 1); diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java index f47de667692f4..f065cc0473864 100644 --- a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java @@ -21,6 +21,7 @@ import org.elasticsearch.index.VersionType; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.IngestService; +import org.elasticsearch.ingest.Processor; import org.elasticsearch.ingest.RandomDocumentPicks; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.test.ESTestCase; @@ -35,6 +36,7 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.StandardCopyOption; import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; @@ -45,7 +47,8 @@ import java.util.Set; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -53,22 +56,24 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { private Path geoipTmpDir; + private Path geoIpConfigDir; + private LocalDatabases localDatabases; private DatabaseRegistry databaseRegistry; private ClusterService clusterService; @Before public void loadDatabaseReaders() throws IOException { - final Path geoIpDir = createTempDir(); final Path configDir = createTempDir(); - final Path geoIpConfigDir = configDir.resolve("ingest-geoip"); + geoIpConfigDir = configDir.resolve("ingest-geoip"); Files.createDirectories(geoIpConfigDir); - copyDatabaseFiles(geoIpDir); Client client = mock(Client.class); GeoIpCache cache = new GeoIpCache(1000); - LocalDatabases localDatabases = new LocalDatabases(geoIpDir, geoIpConfigDir, new GeoIpCache(1000)); + localDatabases = new LocalDatabases(geoIpConfigDir, new GeoIpCache(1000)); + copyDatabaseFiles(geoIpConfigDir, localDatabases); geoipTmpDir = createTempDir(); databaseRegistry = new DatabaseRegistry(geoipTmpDir, client, cache, localDatabases, Runnable::run); + databaseRegistry.initialize("nodeId", mock(ResourceWatcherService.class), mock(IngestService.class)); clusterService = mock(ClusterService.class); when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE); } @@ -86,7 +91,7 @@ public void testBuildDefaults() throws Exception { config.put("field", "_field"); String processorTag = randomAlphaOfLength(10); - GeoIpProcessor processor = factory.create(null, processorTag, null, config); + GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, processorTag, null, config); assertThat(processor.getTag(), equalTo(processorTag)); assertThat(processor.getField(), equalTo("_field")); assertThat(processor.getTargetField(), equalTo("geoip")); @@ -103,7 +108,7 @@ public void testSetIgnoreMissing() throws Exception { config.put("ignore_missing", true); String processorTag = randomAlphaOfLength(10); - GeoIpProcessor processor = factory.create(null, processorTag, null, config); + GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, processorTag, null, config); assertThat(processor.getTag(), equalTo(processorTag)); assertThat(processor.getField(), equalTo("_field")); assertThat(processor.getTargetField(), equalTo("geoip")); @@ -120,7 +125,7 @@ public void testCountryBuildDefaults() throws Exception { config.put("database_file", "GeoLite2-Country.mmdb"); String processorTag = randomAlphaOfLength(10); - GeoIpProcessor processor = factory.create(null, processorTag, null, config); + GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, processorTag, null, config); assertThat(processor.getTag(), equalTo(processorTag)); assertThat(processor.getField(), equalTo("_field")); @@ -138,7 +143,7 @@ public void testAsnBuildDefaults() throws Exception { config.put("database_file", "GeoLite2-ASN.mmdb"); String processorTag = randomAlphaOfLength(10); - GeoIpProcessor processor = factory.create(null, processorTag, null, config); + GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, processorTag, null, config); assertThat(processor.getTag(), equalTo(processorTag)); assertThat(processor.getField(), equalTo("_field")); @@ -153,7 +158,7 @@ public void testBuildTargetField() throws Exception { Map config = new HashMap<>(); config.put("field", "_field"); config.put("target_field", "_field"); - GeoIpProcessor processor = factory.create(null, null, null, config); + GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, null, null, config); assertThat(processor.getField(), equalTo("_field")); assertThat(processor.getTargetField(), equalTo("_field")); assertFalse(processor.isIgnoreMissing()); @@ -164,7 +169,7 @@ public void testBuildDbFile() throws Exception { Map config = new HashMap<>(); config.put("field", "_field"); config.put("database_file", "GeoLite2-Country.mmdb"); - GeoIpProcessor processor = factory.create(null, null, null, config); + GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, null, null, config); assertThat(processor.getField(), equalTo("_field")); assertThat(processor.getTargetField(), equalTo("geoip")); assertThat(processor.getDatabaseType(), equalTo("GeoLite2-Country")); @@ -201,6 +206,9 @@ public void testBuildWithAsnDbAndCityFields() throws Exception { } public void testBuildNonExistingDbFile() throws Exception { + Files.copy(GeoIpProcessorFactoryTests.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"), + geoipTmpDir.resolve("GeoLite2-City.mmdb")); + databaseRegistry.updateDatabase("GeoLite2-City.mmdb", "md5", geoipTmpDir.resolve("GeoLite2-City.mmdb")); GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService); Map config = new HashMap<>(); @@ -210,6 +218,16 @@ public void testBuildNonExistingDbFile() throws Exception { assertThat(e.getMessage(), equalTo("[database_file] database file [does-not-exist.mmdb] doesn't exist")); } + public void testBuildNoDatabasesDownloaded() throws Exception { + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService); + + Map config = new HashMap<>(); + config.put("field", "_field"); + config.put("database_file", "does-not-exist-yet.mmdb"); + Processor processor = factory.create(null, null, null, config); + assertThat(processor, instanceOf(GeoIpProcessor.DatabaseUnavailableProcessor.class)); + } + public void testBuildFields() throws Exception { GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService); @@ -228,7 +246,7 @@ public void testBuildFields() throws Exception { Map config = new HashMap<>(); config.put("field", "_field"); config.put("properties", fieldNames); - GeoIpProcessor processor = factory.create(null, null, null, config); + GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, null, null, config); assertThat(processor.getField(), equalTo("_field")); assertThat(processor.getProperties(), equalTo(properties)); assertFalse(processor.isIgnoreMissing()); @@ -252,21 +270,20 @@ public void testBuildIllegalFieldOption() throws Exception { } public void testLazyLoading() throws Exception { - final Path geoIpDir = createTempDir(); final Path configDir = createTempDir(); final Path geoIpConfigDir = configDir.resolve("ingest-geoip"); Files.createDirectories(geoIpConfigDir); - copyDatabaseFiles(geoIpDir); + GeoIpCache cache = new GeoIpCache(1000); + LocalDatabases localDatabases = new LocalDatabases(geoIpConfigDir, cache); + copyDatabaseFiles(geoIpConfigDir, localDatabases); // Loading another database reader instances, because otherwise we can't test lazy loading as the // database readers used at class level are reused between tests. (we want to keep that otherwise running this // test will take roughly 4 times more time) Client client = mock(Client.class); - GeoIpCache cache = new GeoIpCache(1000); - LocalDatabases localDatabases = new LocalDatabases(geoIpDir, geoIpConfigDir, cache); DatabaseRegistry databaseRegistry = new DatabaseRegistry(createTempDir(), client, cache, localDatabases, Runnable::run); GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService); - for (DatabaseReaderLazyLoader lazyLoader : localDatabases.getAllDatabases()) { + for (DatabaseReaderLazyLoader lazyLoader : localDatabases.getConfigDatabases().values()) { assertNull(lazyLoader.databaseReader.get()); } @@ -276,43 +293,43 @@ public void testLazyLoading() throws Exception { Map config = new HashMap<>(); config.put("field", "_field"); config.put("database_file", "GeoLite2-City.mmdb"); - final GeoIpProcessor city = factory.create(null, "_tag", null, config); + final GeoIpProcessor city = (GeoIpProcessor) factory.create(null, "_tag", null, config); // these are lazy loaded until first use so we expect null here - assertNull(databaseRegistry.getDatabase("GeoLite2-City.mmdb", true).databaseReader.get()); + assertNull(databaseRegistry.getDatabase("GeoLite2-City.mmdb").databaseReader.get()); city.execute(document); // the first ingest should trigger a database load - assertNotNull(databaseRegistry.getDatabase("GeoLite2-City.mmdb", true).databaseReader.get()); + assertNotNull(databaseRegistry.getDatabase("GeoLite2-City.mmdb").databaseReader.get()); config = new HashMap<>(); config.put("field", "_field"); config.put("database_file", "GeoLite2-Country.mmdb"); - final GeoIpProcessor country = factory.create(null, "_tag", null, config); + final GeoIpProcessor country = (GeoIpProcessor) factory.create(null, "_tag", null, config); // these are lazy loaded until first use so we expect null here - assertNull(databaseRegistry.getDatabase("GeoLite2-Country.mmdb", true).databaseReader.get()); + assertNull(databaseRegistry.getDatabase("GeoLite2-Country.mmdb").databaseReader.get()); country.execute(document); // the first ingest should trigger a database load - assertNotNull(databaseRegistry.getDatabase("GeoLite2-Country.mmdb", true).databaseReader.get()); + assertNotNull(databaseRegistry.getDatabase("GeoLite2-Country.mmdb").databaseReader.get()); config = new HashMap<>(); config.put("field", "_field"); config.put("database_file", "GeoLite2-ASN.mmdb"); - final GeoIpProcessor asn = factory.create(null, "_tag", null, config); + final GeoIpProcessor asn = (GeoIpProcessor) factory.create(null, "_tag", null, config); // these are lazy loaded until first use so we expect null here - assertNull(databaseRegistry.getDatabase("GeoLite2-ASN.mmdb", true).databaseReader.get()); + assertNull(databaseRegistry.getDatabase("GeoLite2-ASN.mmdb").databaseReader.get()); asn.execute(document); // the first ingest should trigger a database load - assertNotNull(databaseRegistry.getDatabase("GeoLite2-ASN.mmdb", true).databaseReader.get()); + assertNotNull(databaseRegistry.getDatabase("GeoLite2-ASN.mmdb").databaseReader.get()); } public void testLoadingCustomDatabase() throws IOException { - final Path geoIpDir = createTempDir(); final Path configDir = createTempDir(); final Path geoIpConfigDir = configDir.resolve("ingest-geoip"); Files.createDirectories(geoIpConfigDir); - copyDatabaseFiles(geoIpDir); + LocalDatabases localDatabases = new LocalDatabases(geoIpConfigDir, new GeoIpCache(1000)); + copyDatabaseFiles(geoIpConfigDir, localDatabases); // fake the GeoIP2-City database copyDatabaseFile(geoIpConfigDir, "GeoLite2-City.mmdb"); Files.move(geoIpConfigDir.resolve("GeoLite2-City.mmdb"), geoIpConfigDir.resolve("GeoIP2-City.mmdb")); @@ -323,13 +340,12 @@ public void testLoadingCustomDatabase() throws IOException { */ ThreadPool threadPool = new TestThreadPool("test"); ResourceWatcherService resourceWatcherService = new ResourceWatcherService(Settings.EMPTY, threadPool); - LocalDatabases localDatabases = new LocalDatabases(geoIpDir, geoIpConfigDir, new GeoIpCache(1000)); Client client = mock(Client.class); GeoIpCache cache = new GeoIpCache(1000); DatabaseRegistry databaseRegistry = new DatabaseRegistry(createTempDir(), client, cache, localDatabases, Runnable::run); databaseRegistry.initialize("nodeId", resourceWatcherService, mock(IngestService.class)); GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService); - for (DatabaseReaderLazyLoader lazyLoader : localDatabases.getAllDatabases()) { + for (DatabaseReaderLazyLoader lazyLoader : localDatabases.getConfigDatabases().values()) { assertNull(lazyLoader.databaseReader.get()); } @@ -339,35 +355,24 @@ public void testLoadingCustomDatabase() throws IOException { Map config = new HashMap<>(); config.put("field", "_field"); config.put("database_file", "GeoIP2-City.mmdb"); - final GeoIpProcessor city = factory.create(null, "_tag", null, config); + final GeoIpProcessor city = (GeoIpProcessor) factory.create(null, "_tag", null, config); // these are lazy loaded until first use so we expect null here - assertNull(databaseRegistry.getDatabase("GeoIP2-City.mmdb", true).databaseReader.get()); + assertNull(databaseRegistry.getDatabase("GeoIP2-City.mmdb").databaseReader.get()); city.execute(document); // the first ingest should trigger a database load - assertNotNull(databaseRegistry.getDatabase("GeoIP2-City.mmdb", true).databaseReader.get()); + assertNotNull(databaseRegistry.getDatabase("GeoIP2-City.mmdb").databaseReader.get()); resourceWatcherService.close(); threadPool.shutdown(); } public void testFallbackUsingDefaultDatabases() throws Exception { GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService); - { - Map config = new HashMap<>(); - config.put("field", "source_field"); - config.put("fallback_to_default_databases", false); - Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, null, config)); - assertThat(e.getMessage(), equalTo("[database_file] database file [GeoLite2-City.mmdb] doesn't exist")); - } - { - Map config = new HashMap<>(); - config.put("field", "source_field"); - if (randomBoolean()) { - config.put("fallback_to_default_databases", true); - } - GeoIpProcessor processor = factory.create(null, null, null, config); - assertThat(processor, notNullValue()); - } + Map config = new HashMap<>(); + config.put("field", "source_field"); + config.put("fallback_to_default_databases", randomBoolean()); + factory.create(null, null, null, config); + assertWarnings(GeoIpProcessor.DEFAULT_DATABASES_DEPRECATION_MESSAGE); } public void testDefaultDatabaseWithTaskPresent() throws Exception { @@ -385,63 +390,93 @@ public void testDefaultDatabaseWithTaskPresent() throws Exception { config.put("field", "_field"); String processorTag = randomAlphaOfLength(10); - GeoIpProcessor processor = factory.create(null, processorTag, null, config); + GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, processorTag, null, config); processor.execute(RandomDocumentPicks.randomIngestDocument(random(), Map.of("_field", "89.160.20.128"))); } - public void testFallbackUsingDefaultDatabasesWhileIngesting() throws Exception { - copyDatabaseFile(geoipTmpDir, "GeoLite2-City-Test.mmdb"); + public void testUpdateDatabaseWhileIngesting() throws Exception { GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService); - // fallback_to_default_databases=true, first use default city db then a custom city db: + Map config = new HashMap<>(); + config.put("field", "source_field"); + GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, null, null, config); + Map document = new HashMap<>(); + document.put("source_field", "89.160.20.128"); { - Map config = new HashMap<>(); - config.put("field", "source_field"); - if (randomBoolean()) { - config.put("fallback_to_default_databases", true); - } - GeoIpProcessor processor = factory.create(null, null, null, config); - Map document = new HashMap<>(); - document.put("source_field", "89.160.20.128"); IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); processor.execute(ingestDocument); Map geoData = (Map) ingestDocument.getSourceAndMetadata().get("geoip"); assertThat(geoData.get("city_name"), equalTo("Tumba")); - + } + { + copyDatabaseFile(geoipTmpDir, "GeoLite2-City-Test.mmdb"); + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); databaseRegistry.updateDatabase("GeoLite2-City.mmdb", "md5", geoipTmpDir.resolve("GeoLite2-City-Test.mmdb")); - ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); processor.execute(ingestDocument); - geoData = (Map) ingestDocument.getSourceAndMetadata().get("geoip"); + Map geoData = (Map) ingestDocument.getSourceAndMetadata().get("geoip"); assertThat(geoData.get("city_name"), equalTo("Linköping")); } - // fallback_to_default_databases=false, first use a custom city db then remove the custom db and expect failure: + { + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); + databaseRegistry.removeStaleEntries(List.of("GeoLite2-City.mmdb")); + localDatabases.updateDatabase(geoIpConfigDir.resolve("GeoLite2-City.mmdb"), false); + Exception e = expectThrows(ResourceNotFoundException.class, () -> processor.execute(ingestDocument)); + assertThat(e.getMessage(), equalTo("database file [GeoLite2-City.mmdb] doesn't exist")); + } + } + + public void testDatabaseNotReadyYet() throws Exception { + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService); + { Map config = new HashMap<>(); config.put("field", "source_field"); - config.put("fallback_to_default_databases", false); - GeoIpProcessor processor = factory.create(null, null, null, config); + config.put("database_file", "GeoLite2-City-Test.mmdb"); + Map document = new HashMap<>(); document.put("source_field", "89.160.20.128"); IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); + + GeoIpProcessor.DatabaseUnavailableProcessor processor = + (GeoIpProcessor.DatabaseUnavailableProcessor) factory.create(null, null, null, config); processor.execute(ingestDocument); + assertThat(ingestDocument.getSourceAndMetadata().get("geoip"), nullValue()); + assertThat(ingestDocument.getSourceAndMetadata().get("tags"), + equalTo(List.of("_geoip_database_unavailable_GeoLite2-City-Test.mmdb"))); + } + + copyDatabaseFile(geoipTmpDir, "GeoLite2-City-Test.mmdb"); + databaseRegistry.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoipTmpDir.resolve("GeoLite2-City-Test.mmdb")); + + { + Map config = new HashMap<>(); + config.put("field", "source_field"); + config.put("database_file", "GeoLite2-City-Test.mmdb"); + + Map document = new HashMap<>(); + document.put("source_field", "89.160.20.128"); + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); + + GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, null, null, config); + processor.execute(ingestDocument); + assertThat(ingestDocument.getSourceAndMetadata().get("tags"), nullValue()); Map geoData = (Map) ingestDocument.getSourceAndMetadata().get("geoip"); assertThat(geoData.get("city_name"), equalTo("Linköping")); - databaseRegistry.removeStaleEntries(List.of("GeoLite2-City.mmdb")); - Exception e = expectThrows(ResourceNotFoundException.class, () -> processor.execute(ingestDocument)); - assertThat(e.getMessage(), equalTo("database file [GeoLite2-City.mmdb] doesn't exist")); } } private static void copyDatabaseFile(final Path path, final String databaseFilename) throws IOException { Files.copy( new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/" + databaseFilename)), - path.resolve(databaseFilename) + path.resolve(databaseFilename), + StandardCopyOption.REPLACE_EXISTING ); } - static void copyDatabaseFiles(final Path path) throws IOException { + static void copyDatabaseFiles(final Path path, LocalDatabases localDatabases) throws IOException { for (final String databaseFilename : IngestGeoIpPlugin.DEFAULT_DATABASE_FILENAMES) { copyDatabaseFile(path, databaseFilename); + localDatabases.updateDatabase(path.resolve(databaseFilename), true); } } diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/LocalDatabasesTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/LocalDatabasesTests.java index 2c2d4e73faeb2..4d27b22e7db19 100644 --- a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/LocalDatabasesTests.java +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/LocalDatabasesTests.java @@ -20,11 +20,14 @@ import org.junit.Before; import java.io.IOException; +import java.nio.file.CopyOption; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardCopyOption; +import static org.hamcrest.Matchers.anEmptyMap; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; public class LocalDatabasesTests extends ESTestCase { @@ -46,19 +49,13 @@ public void cleanup() { public void testLocalDatabasesEmptyConfig() throws Exception { Path configDir = createTempDir(); - LocalDatabases localDatabases = new LocalDatabases(prepareModuleDir(), configDir, new GeoIpCache(0)); + LocalDatabases localDatabases = new LocalDatabases(configDir, new GeoIpCache(0)); localDatabases.initialize(resourceWatcherService); - assertThat(localDatabases.getDefaultDatabases().size(), equalTo(3)); - assertThat(localDatabases.getConfigDatabases().size(), equalTo(0)); - DatabaseReaderLazyLoader loader = localDatabases.getDatabase("GeoLite2-ASN.mmdb", true); - assertThat(loader.getDatabaseType(), equalTo("GeoLite2-ASN")); - - loader = localDatabases.getDatabase("GeoLite2-City.mmdb", true); - assertThat(loader.getDatabaseType(), equalTo("GeoLite2-City")); - - loader = localDatabases.getDatabase("GeoLite2-Country.mmdb", true); - assertThat(loader.getDatabaseType(), equalTo("GeoLite2-Country")); + assertThat(localDatabases.getConfigDatabases(), anEmptyMap()); + assertThat(localDatabases.getDatabase("GeoLite2-ASN.mmdb"), nullValue()); + assertThat(localDatabases.getDatabase("GeoLite2-City.mmdb"), nullValue()); + assertThat(localDatabases.getDatabase("GeoLite2-Country.mmdb"), nullValue()); } public void testDatabasesConfigDir() throws Exception { @@ -66,55 +63,48 @@ public void testDatabasesConfigDir() throws Exception { Files.copy(LocalDatabases.class.getResourceAsStream("/GeoIP2-City-Test.mmdb"), configDir.resolve("GeoIP2-City.mmdb")); Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"), configDir.resolve("GeoLite2-City.mmdb")); - LocalDatabases localDatabases = new LocalDatabases(prepareModuleDir(), configDir, new GeoIpCache(0)); + LocalDatabases localDatabases = new LocalDatabases(configDir, new GeoIpCache(0)); localDatabases.initialize(resourceWatcherService); - assertThat(localDatabases.getDefaultDatabases().size(), equalTo(3)); assertThat(localDatabases.getConfigDatabases().size(), equalTo(2)); - DatabaseReaderLazyLoader loader = localDatabases.getDatabase("GeoLite2-ASN.mmdb", true); - assertThat(loader.getDatabaseType(), equalTo("GeoLite2-ASN")); - - loader = localDatabases.getDatabase("GeoLite2-City.mmdb", true); + DatabaseReaderLazyLoader loader = localDatabases.getDatabase("GeoLite2-City.mmdb"); assertThat(loader.getDatabaseType(), equalTo("GeoLite2-City")); - loader = localDatabases.getDatabase("GeoLite2-Country.mmdb", true); - assertThat(loader.getDatabaseType(), equalTo("GeoLite2-Country")); - - loader = localDatabases.getDatabase("GeoIP2-City.mmdb", true); + loader = localDatabases.getDatabase("GeoIP2-City.mmdb"); assertThat(loader.getDatabaseType(), equalTo("GeoIP2-City")); } public void testDatabasesDynamicUpdateConfigDir() throws Exception { - Path configDir = createTempDir(); - LocalDatabases localDatabases = new LocalDatabases(prepareModuleDir(), configDir, new GeoIpCache(0)); + Path configDir = prepareConfigDir(); + LocalDatabases localDatabases = new LocalDatabases(configDir, new GeoIpCache(0)); localDatabases.initialize(resourceWatcherService); { - assertThat(localDatabases.getDefaultDatabases().size(), equalTo(3)); - DatabaseReaderLazyLoader loader = localDatabases.getDatabase("GeoLite2-ASN.mmdb", true); + assertThat(localDatabases.getConfigDatabases().size(), equalTo(3)); + DatabaseReaderLazyLoader loader = localDatabases.getDatabase("GeoLite2-ASN.mmdb"); assertThat(loader.getDatabaseType(), equalTo("GeoLite2-ASN")); - loader = localDatabases.getDatabase("GeoLite2-City.mmdb", true); + loader = localDatabases.getDatabase("GeoLite2-City.mmdb"); assertThat(loader.getDatabaseType(), equalTo("GeoLite2-City")); - loader = localDatabases.getDatabase("GeoLite2-Country.mmdb", true); + loader = localDatabases.getDatabase("GeoLite2-Country.mmdb"); assertThat(loader.getDatabaseType(), equalTo("GeoLite2-Country")); } + CopyOption option = StandardCopyOption.REPLACE_EXISTING; Files.copy(LocalDatabases.class.getResourceAsStream("/GeoIP2-City-Test.mmdb"), configDir.resolve("GeoIP2-City.mmdb")); - Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"), configDir.resolve("GeoLite2-City.mmdb")); + Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"), configDir.resolve("GeoLite2-City.mmdb"), option); assertBusy(() -> { - assertThat(localDatabases.getDefaultDatabases().size(), equalTo(3)); - assertThat(localDatabases.getConfigDatabases().size(), equalTo(2)); - DatabaseReaderLazyLoader loader = localDatabases.getDatabase("GeoLite2-ASN.mmdb", true); + assertThat(localDatabases.getConfigDatabases().size(), equalTo(4)); + DatabaseReaderLazyLoader loader = localDatabases.getDatabase("GeoLite2-ASN.mmdb"); assertThat(loader.getDatabaseType(), equalTo("GeoLite2-ASN")); - loader = localDatabases.getDatabase("GeoLite2-City.mmdb", true); + loader = localDatabases.getDatabase("GeoLite2-City.mmdb"); assertThat(loader.getDatabaseType(), equalTo("GeoLite2-City")); - loader = localDatabases.getDatabase("GeoLite2-Country.mmdb", true); + loader = localDatabases.getDatabase("GeoLite2-Country.mmdb"); assertThat(loader.getDatabaseType(), equalTo("GeoLite2-Country")); - loader = localDatabases.getDatabase("GeoIP2-City.mmdb", true); + loader = localDatabases.getDatabase("GeoIP2-City.mmdb"); assertThat(loader.getDatabaseType(), equalTo("GeoIP2-City")); }); } @@ -123,14 +113,13 @@ public void testDatabasesUpdateExistingConfDatabase() throws Exception { Path configDir = createTempDir(); Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City.mmdb"), configDir.resolve("GeoLite2-City.mmdb")); GeoIpCache cache = new GeoIpCache(1000); // real cache to test purging of entries upon a reload - LocalDatabases localDatabases = new LocalDatabases(prepareModuleDir(), configDir, cache); + LocalDatabases localDatabases = new LocalDatabases(configDir, cache); localDatabases.initialize(resourceWatcherService); { assertThat(cache.count(), equalTo(0)); - assertThat(localDatabases.getDefaultDatabases().size(), equalTo(3)); assertThat(localDatabases.getConfigDatabases().size(), equalTo(1)); - DatabaseReaderLazyLoader loader = localDatabases.getDatabase("GeoLite2-City.mmdb", true); + DatabaseReaderLazyLoader loader = localDatabases.getDatabase("GeoLite2-City.mmdb"); assertThat(loader.getDatabaseType(), equalTo("GeoLite2-City")); CityResponse cityResponse = loader.getCity(InetAddresses.forString("89.160.20.128")); assertThat(cityResponse.getCity().getName(), equalTo("Tumba")); @@ -140,11 +129,10 @@ public void testDatabasesUpdateExistingConfDatabase() throws Exception { Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"), configDir.resolve("GeoLite2-City.mmdb"), StandardCopyOption.REPLACE_EXISTING); assertBusy(() -> { - assertThat(localDatabases.getDefaultDatabases().size(), equalTo(3)); assertThat(localDatabases.getConfigDatabases().size(), equalTo(1)); assertThat(cache.count(), equalTo(0)); - DatabaseReaderLazyLoader loader = localDatabases.getDatabase("GeoLite2-City.mmdb", true); + DatabaseReaderLazyLoader loader = localDatabases.getDatabase("GeoLite2-City.mmdb"); assertThat(loader.getDatabaseType(), equalTo("GeoLite2-City")); CityResponse cityResponse = loader.getCity(InetAddresses.forString("89.160.20.128")); assertThat(cityResponse.getCity().getName(), equalTo("Linköping")); @@ -153,13 +141,12 @@ public void testDatabasesUpdateExistingConfDatabase() throws Exception { Files.delete(configDir.resolve("GeoLite2-City.mmdb")); assertBusy(() -> { - assertThat(localDatabases.getDefaultDatabases().size(), equalTo(3)); assertThat(localDatabases.getConfigDatabases().size(), equalTo(0)); assertThat(cache.count(), equalTo(0)); }); } - private static Path prepareModuleDir() throws IOException { + private static Path prepareConfigDir() throws IOException { Path dir = createTempDir(); Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-ASN.mmdb"), dir.resolve("GeoLite2-ASN.mmdb")); Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City.mmdb"), dir.resolve("GeoLite2-City.mmdb")); diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStatsActionNodeResponseSerializingTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStatsActionNodeResponseSerializingTests.java index 9a5046e503d58..cb914eee8f893 100644 --- a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStatsActionNodeResponseSerializingTests.java +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStatsActionNodeResponseSerializingTests.java @@ -32,7 +32,8 @@ static GeoIpDownloaderStatsAction.NodeResponse createRandomInstance() { DiscoveryNode node = new DiscoveryNode("id", buildNewFakeTransportAddress(), Version.CURRENT); Set databases = Set.copyOf(randomList(10, () -> randomAlphaOfLengthBetween(5, 10))); Set files = Set.copyOf(randomList(10, () -> randomAlphaOfLengthBetween(5, 10))); + Set configDatabases = Set.copyOf(randomList(10, () -> randomAlphaOfLengthBetween(5, 10))); return new GeoIpDownloaderStatsAction.NodeResponse(node, GeoIpDownloaderStatsSerializingTests.createRandomInstance(), databases, - files); + files, configDatabases); } } diff --git a/modules/ingest-geoip/src/yamlRestTest/java/org/elasticsearch/ingest/geoip/IngestGeoIpClientYamlTestSuiteIT.java b/modules/ingest-geoip/src/yamlRestTest/java/org/elasticsearch/ingest/geoip/IngestGeoIpClientYamlTestSuiteIT.java index 4f97b67c7666c..5b40f4a6ada43 100644 --- a/modules/ingest-geoip/src/yamlRestTest/java/org/elasticsearch/ingest/geoip/IngestGeoIpClientYamlTestSuiteIT.java +++ b/modules/ingest-geoip/src/yamlRestTest/java/org/elasticsearch/ingest/geoip/IngestGeoIpClientYamlTestSuiteIT.java @@ -11,8 +11,17 @@ import com.carrotsearch.randomizedtesting.annotations.Name; import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import org.elasticsearch.client.Request; import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate; import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase; +import org.junit.Before; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; public class IngestGeoIpClientYamlTestSuiteIT extends ESClientYamlSuiteTestCase { @@ -24,4 +33,24 @@ public IngestGeoIpClientYamlTestSuiteIT(@Name("yaml") ClientYamlTestCandidate te public static Iterable parameters() throws Exception { return ESClientYamlSuiteTestCase.createParameters(); } + + @Before + public void waitForDatabases() throws Exception { + assertBusy(() -> { + Request request = new Request("GET", "/_ingest/geoip/stats"); + Map response = entityAsMap(client().performRequest(request)); + + Map downloadStats = (Map) response.get("stats"); + assertThat(downloadStats.get("databases_count"), equalTo(3)); + + Map nodes = (Map) response.get("nodes"); + assertThat(nodes.size(), equalTo(1)); + Map node = (Map) nodes.values().iterator().next(); + List databases = ((List) node.get("databases")).stream() + .map(o -> (String) ((Map) o).get("name")) + .collect(Collectors.toList()); + assertThat(databases, containsInAnyOrder("GeoLite2-City.mmdb", "GeoLite2-Country.mmdb", "GeoLite2-ASN.mmdb")); + }); + } + } diff --git a/modules/ingest-geoip/src/yamlRestTest/resources/rest-api-spec/test/ingest_geoip/20_geoip_processor.yml b/modules/ingest-geoip/src/yamlRestTest/resources/rest-api-spec/test/ingest_geoip/20_geoip_processor.yml index 3b5ee3bd55381..d0da405cdeea9 100644 --- a/modules/ingest-geoip/src/yamlRestTest/resources/rest-api-spec/test/ingest_geoip/20_geoip_processor.yml +++ b/modules/ingest-geoip/src/yamlRestTest/resources/rest-api-spec/test/ingest_geoip/20_geoip_processor.yml @@ -21,22 +21,22 @@ index: test id: 1 pipeline: "my_pipeline" - body: {field1: "128.101.101.101"} + body: {field1: "89.160.20.128"} - do: get: index: test id: 1 - - match: { _source.field1: "128.101.101.101" } + - match: { _source.field1: "89.160.20.128" } - length: { _source.geoip: 7 } - - match: { _source.geoip.city_name: "Minneapolis" } - - match: { _source.geoip.country_iso_code: "US" } - - match: { _source.geoip.location.lon: -93.2548 } - - match: { _source.geoip.location.lat: 44.9399 } - - match: { _source.geoip.region_iso_code: "US-MN" } - - match: { _source.geoip.country_name: "United States" } - - match: { _source.geoip.region_name: "Minnesota" } - - match: { _source.geoip.continent_name: "North America" } + - match: { _source.geoip.city_name: "Linköping" } + - match: { _source.geoip.country_iso_code: "SE" } + - match: { _source.geoip.location.lon: 15.6167 } + - match: { _source.geoip.location.lat: 58.4167 } + - match: { _source.geoip.region_iso_code: "SE-E" } + - match: { _source.geoip.country_name: "Sweden" } + - match: { _source.geoip.region_name: "Östergötland County" } + - match: { _source.geoip.continent_name: "Europe" } --- "Test geoip processor with list": @@ -62,23 +62,23 @@ index: test id: 1 pipeline: "my_pipeline" - body: {field1: ["128.101.101.101", "127.0.0.1"]} + body: {field1: ["89.160.20.128", "127.0.0.1"]} - do: get: index: test id: 1 - - match: { _source.field1: ["128.101.101.101", "127.0.0.1"] } + - match: { _source.field1: ["89.160.20.128", "127.0.0.1"] } - length: { _source.geoip: 2 } - length: { _source.geoip.0: 7 } - - match: { _source.geoip.0.city_name: "Minneapolis" } - - match: { _source.geoip.0.country_iso_code: "US" } - - match: { _source.geoip.0.location.lon: -93.2548 } - - match: { _source.geoip.0.location.lat: 44.9399 } - - match: { _source.geoip.0.region_iso_code: "US-MN" } - - match: { _source.geoip.0.country_name: "United States" } - - match: { _source.geoip.0.region_name: "Minnesota" } - - match: { _source.geoip.0.continent_name: "North America" } + - match: { _source.geoip.0.city_name: "Linköping" } + - match: { _source.geoip.0.country_iso_code: "SE" } + - match: { _source.geoip.0.location.lon: 15.6167 } + - match: { _source.geoip.0.location.lat: 58.4167 } + - match: { _source.geoip.0.region_iso_code: "SE-E" } + - match: { _source.geoip.0.country_name: "Sweden" } + - match: { _source.geoip.0.region_name: "Östergötland County" } + - match: { _source.geoip.0.continent_name: "Europe" } - match: { _source.geoip.1: null } --- @@ -104,22 +104,22 @@ index: test id: 1 pipeline: "my_pipeline" - body: {field1: ["127.0.0.1", "128.101.101.101", "128.101.101.101"]} + body: {field1: ["127.0.0.1", "89.160.20.128", "89.160.20.128"]} - do: get: index: test id: 1 - - match: { _source.field1: ["127.0.0.1", "128.101.101.101", "128.101.101.101"] } + - match: { _source.field1: ["127.0.0.1", "89.160.20.128", "89.160.20.128"] } - length: { _source.geoip: 7 } - - match: { _source.geoip.city_name: "Minneapolis" } - - match: { _source.geoip.country_iso_code: "US" } - - match: { _source.geoip.location.lon: -93.2548 } - - match: { _source.geoip.location.lat: 44.9399 } - - match: { _source.geoip.region_iso_code: "US-MN" } - - match: { _source.geoip.country_name: "United States" } - - match: { _source.geoip.region_name: "Minnesota" } - - match: { _source.geoip.continent_name: "North America" } + - match: { _source.geoip.city_name: "Linköping" } + - match: { _source.geoip.country_iso_code: "SE" } + - match: { _source.geoip.location.lon: 15.6167 } + - match: { _source.geoip.location.lat: 58.4167 } + - match: { _source.geoip.region_iso_code: "SE-E" } + - match: { _source.geoip.country_name: "Sweden" } + - match: { _source.geoip.region_name: "Östergötland County" } + - match: { _source.geoip.continent_name: "Europe" } --- "Test geoip processor with fields": @@ -149,24 +149,24 @@ index: test id: 1 pipeline: "my_pipeline" - body: {field1: "128.101.101.101"} + body: {field1: "89.160.20.128"} - do: get: index: test id: 1 - - match: { _source.field1: "128.101.101.101" } + - match: { _source.field1: "89.160.20.128" } - length: { _source.geoip: 9 } - - match: { _source.geoip.city_name: "Minneapolis" } - - match: { _source.geoip.country_iso_code: "US" } - - match: { _source.geoip.ip: "128.101.101.101" } - - match: { _source.geoip.location.lon: -93.2548 } - - match: { _source.geoip.location.lat: 44.9399 } - - match: { _source.geoip.timezone: "America/Chicago" } - - match: { _source.geoip.country_name: "United States" } - - match: { _source.geoip.region_iso_code: "US-MN" } - - match: { _source.geoip.region_name: "Minnesota" } - - match: { _source.geoip.continent_name: "North America" } + - match: { _source.geoip.city_name: "Linköping" } + - match: { _source.geoip.country_iso_code: "SE" } + - match: { _source.geoip.ip: "89.160.20.128" } + - match: { _source.geoip.location.lon: 15.6167 } + - match: { _source.geoip.location.lat: 58.4167 } + - match: { _source.geoip.timezone: "Europe/Stockholm" } + - match: { _source.geoip.country_name: "Sweden" } + - match: { _source.geoip.region_iso_code: "SE-E" } + - match: { _source.geoip.region_name: "Östergötland County" } + - match: { _source.geoip.continent_name: "Europe" } --- "Test geoip processor with different database file - GeoLite2-Country": @@ -192,17 +192,17 @@ index: test id: 1 pipeline: "my_pipeline" - body: {field1: "128.101.101.101"} + body: {field1: "89.160.20.128"} - do: get: index: test id: 1 - - match: { _source.field1: "128.101.101.101" } + - match: { _source.field1: "89.160.20.128" } - length: { _source.geoip: 3 } - - match: { _source.geoip.country_iso_code: "US" } - - match: { _source.geoip.country_name: "United States" } - - match: { _source.geoip.continent_name: "North America" } + - match: { _source.geoip.country_iso_code: "SE" } + - match: { _source.geoip.country_name: "Sweden" } + - match: { _source.geoip.continent_name: "Europe" } --- "Test geoip processor with geopoint mapping (both missing and including location)": @@ -256,22 +256,22 @@ index: test id: 2 pipeline: "my_pipeline" - body: { field1: "128.101.101.101" } + body: { field1: "89.160.20.128" } - do: get: index: test id: 2 - - match: { _source.field1: "128.101.101.101" } + - match: { _source.field1: "89.160.20.128" } - length: { _source.geoip: 7 } - - match: { _source.geoip.city_name: "Minneapolis" } - - match: { _source.geoip.country_iso_code: "US" } - - match: { _source.geoip.location.lon: -93.2548 } - - match: { _source.geoip.location.lat: 44.9399 } - - match: { _source.geoip.region_iso_code: "US-MN" } - - match: { _source.geoip.country_name: "United States" } - - match: { _source.geoip.region_name: "Minnesota" } - - match: { _source.geoip.continent_name: "North America" } + - match: { _source.geoip.city_name: "Linköping" } + - match: { _source.geoip.country_iso_code: "SE" } + - match: { _source.geoip.location.lon: 15.6167 } + - match: { _source.geoip.location.lat: 58.4167 } + - match: { _source.geoip.region_iso_code: "SE-E" } + - match: { _source.geoip.country_name: "Sweden" } + - match: { _source.geoip.region_name: "Östergötland County" } + - match: { _source.geoip.continent_name: "Europe" } --- "Test geoip processor with different database file - GeoLite2-ASN": @@ -297,15 +297,15 @@ index: test id: 1 pipeline: "my_pipeline" - body: {field1: "82.171.64.0"} + body: {field1: "89.160.20.128"} - do: get: index: test id: 1 - - match: { _source.field1: "82.171.64.0" } + - match: { _source.field1: "89.160.20.128" } - length: { _source.geoip: 4 } - - match: { _source.geoip.ip: "82.171.64.0" } - - match: { _source.geoip.asn: 1136 } - - match: { _source.geoip.organization_name: "KPN B.V." } - - match: { _source.geoip.network: "82.168.0.0/14" } + - match: { _source.geoip.ip: "89.160.20.128" } + - match: { _source.geoip.asn: 29518 } + - match: { _source.geoip.organization_name: "Bredband2 AB" } + - match: { _source.geoip.network: "89.160.0.0/17" } diff --git a/qa/smoke-test-ingest-with-all-dependencies/build.gradle b/qa/smoke-test-ingest-with-all-dependencies/build.gradle index 1654b1377866d..3df6a245ab5dc 100644 --- a/qa/smoke-test-ingest-with-all-dependencies/build.gradle +++ b/qa/smoke-test-ingest-with-all-dependencies/build.gradle @@ -21,6 +21,7 @@ dependencies { testClusters.configureEach { setting 'xpack.security.enabled', 'false' + extraConfigFile 'ingest-geoip/GeoLite2-City.mmdb', file("${project.projectDir}/src/test/resources/GeoLite2-City.mmdb") } tasks.named("testingConventions").configure { @@ -30,3 +31,7 @@ tasks.named("testingConventions").configure { } } } + +tasks.named("forbiddenPatterns").configure { + exclude '**/*.mmdb' +} diff --git a/qa/smoke-test-ingest-with-all-dependencies/src/test/resources/GeoLite2-City.mmdb b/qa/smoke-test-ingest-with-all-dependencies/src/test/resources/GeoLite2-City.mmdb new file mode 100644 index 0000000000000..0809201619b59 Binary files /dev/null and b/qa/smoke-test-ingest-with-all-dependencies/src/test/resources/GeoLite2-City.mmdb differ diff --git a/qa/smoke-test-ingest-with-all-dependencies/src/test/resources/rest-api-spec/test/ingest/20_combine_processors.yml b/qa/smoke-test-ingest-with-all-dependencies/src/test/resources/rest-api-spec/test/ingest/20_combine_processors.yml index d748a2388bdd4..14a1c71bed52d 100644 --- a/qa/smoke-test-ingest-with-all-dependencies/src/test/resources/rest-api-spec/test/ingest/20_combine_processors.yml +++ b/qa/smoke-test-ingest-with-all-dependencies/src/test/resources/rest-api-spec/test/ingest/20_combine_processors.yml @@ -46,7 +46,7 @@ id: 1 pipeline: "_id" body: { - log: "70.193.17.92 - - [08/Sep/2014:02:54:42 +0000] \"GET /presentations/logstash-scale11x/images/ahhh___rage_face_by_samusmmx-d5g5zap.png HTTP/1.1\" 200 175208 \"http://mobile.rivals.com/board_posts.asp?SID=880&mid=198829575&fid=2208&tid=198829575&Team=&TeamId=&SiteId=\" \"Mozilla/5.0 (Linux; Android 4.2.2; VS980 4G Build/JDQ39B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/33.0.1750.135 Mobile Safari/537.36\"" + log: "89.160.20.128 - - [08/Sep/2014:02:54:42 +0000] \"GET /presentations/logstash-scale11x/images/ahhh___rage_face_by_samusmmx-d5g5zap.png HTTP/1.1\" 200 175208 \"http://mobile.rivals.com/board_posts.asp?SID=880&mid=198829575&fid=2208&tid=198829575&Team=&TeamId=&SiteId=\" \"Mozilla/5.0 (Linux; Android 4.2.2; VS980 4G Build/JDQ39B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/33.0.1750.135 Mobile Safari/537.36\"" } - do: @@ -61,11 +61,11 @@ - match: { _source.referrer: "\"http://mobile.rivals.com/board_posts.asp?SID=880&mid=198829575&fid=2208&tid=198829575&Team=&TeamId=&SiteId=\"" } - match: { _source.response: 200 } - match: { _source.bytes: 175208 } - - match: { _source.clientip: "70.193.17.92" } + - match: { _source.clientip: "89.160.20.128" } - match: { _source.httpversion: "1.1" } - match: { _source.timestamp: "2014-09-08T02:54:42.000Z" } - - match: { _source.geoip.continent_name: "North America" } - - match: { _source.geoip.country_iso_code: "US" } + - match: { _source.geoip.continent_name: "Europe" } + - match: { _source.geoip.country_iso_code: "SE" } --- "Test with date processor and ECS-v1": @@ -104,7 +104,7 @@ id: 1 pipeline: "_id" body: { - log: "70.193.17.92 - - [08/Sep/2014:02:54:42 +0000] \"GET /presentations/logstash-scale11x/images/ahhh___rage_face_by_samusmmx-d5g5zap.png HTTP/1.1\" 200 175208 \"http://mobile.rivals.com/board_posts.asp?SID=880&mid=198829575&fid=2208&tid=198829575&Team=&TeamId=&SiteId=\" \"Mozilla/5.0 (Linux; Android 4.2.2; VS980 4G Build/JDQ39B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/33.0.1750.135 Mobile Safari/537.36\"" + log: "89.160.20.128 - - [08/Sep/2014:02:54:42 +0000] \"GET /presentations/logstash-scale11x/images/ahhh___rage_face_by_samusmmx-d5g5zap.png HTTP/1.1\" 200 175208 \"http://mobile.rivals.com/board_posts.asp?SID=880&mid=198829575&fid=2208&tid=198829575&Team=&TeamId=&SiteId=\" \"Mozilla/5.0 (Linux; Android 4.2.2; VS980 4G Build/JDQ39B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/33.0.1750.135 Mobile Safari/537.36\"" } - do: @@ -118,11 +118,11 @@ - match: { _source.http.request.referrer: "http://mobile.rivals.com/board_posts.asp?SID=880&mid=198829575&fid=2208&tid=198829575&Team=&TeamId=&SiteId=" } - match: { _source.http.response.status_code: 200 } - match: { _source.http.response.body.bytes: 175208 } - - match: { _source.source.address: "70.193.17.92" } + - match: { _source.source.address: "89.160.20.128" } - match: { _source.http.version: "1.1" } - match: { _source.timestamp: "2014-09-08T02:54:42.000Z" } - - match: { _source.geoip.continent_name: "North America" } - - match: { _source.geoip.country_iso_code: "US" } + - match: { _source.geoip.continent_name: "Europe" } + - match: { _source.geoip.country_iso_code: "SE" } --- "Test mutate": diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 3d97f2670233a..0396c1de3f11b 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -55,10 +55,12 @@ import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -68,6 +70,7 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.IntConsumer; +import java.util.function.Predicate; import java.util.stream.Collectors; /** @@ -777,7 +780,7 @@ public void applyClusterState(final ClusterChangedEvent event) { } } - void innerUpdatePipelines(IngestMetadata newIngestMetadata) { + synchronized void innerUpdatePipelines(IngestMetadata newIngestMetadata) { Map existingPipelines = this.pipelines; // Lazy initialize these variables in order to favour the most like scenario that there are no pipeline changes: @@ -876,7 +879,7 @@ void innerUpdatePipelines(IngestMetadata newIngestMetadata) { * @param clazz the Processor class to look for * @return True if the pipeline contains an instance of the Processor class passed in */ - public

List

getProcessorsInPipeline(String pipelineId, Class

clazz) { + public

List

getProcessorsInPipeline(String pipelineId, Class

clazz) { Pipeline pipeline = getPipeline(pipelineId); if (pipeline == null) { throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist"); @@ -905,6 +908,27 @@ public

List

getProcessorsInPipeline(String pipelineId, C return processors; } + public

Collection getPipelineWithProcessorType(Class

clazz, Predicate

predicate) { + List matchedPipelines = new LinkedList<>(); + for (PipelineHolder holder : pipelines.values()) { + String pipelineId = holder.pipeline.getId(); + List

processors = getProcessorsInPipeline(pipelineId, clazz); + if (processors.isEmpty() == false && processors.stream().anyMatch(predicate)) { + matchedPipelines.add(pipelineId); + } + } + return matchedPipelines; + } + + public synchronized void reloadPipeline(String id) throws Exception { + PipelineHolder holder = pipelines.get(id); + Pipeline updatedPipeline = + Pipeline.create(id, holder.configuration.getConfigAsMap(), processorFactories, scriptService); + Map updatedPipelines = new HashMap<>(this.pipelines); + updatedPipelines.put(id, new PipelineHolder(holder.configuration, updatedPipeline)); + this.pipelines = Map.copyOf(updatedPipelines); + } + private static Pipeline substitutePipeline(String id, ElasticsearchParseException e) { String tag = e.getHeaderKeys().contains("processor_tag") ? e.getHeader("processor_tag").get(0) : null; String type = e.getHeaderKeys().contains("processor_type") ? e.getHeader("processor_type").get(0) : "unknown"; diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index b774393548f92..a9222c43824a9 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -43,10 +43,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.time.DateFormatter; import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.xcontent.XContentType; -import org.elasticsearch.xcontent.cbor.CborXContent; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; @@ -60,6 +57,9 @@ import org.elasticsearch.test.MockLogAppender; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool.Names; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.xcontent.cbor.CborXContent; import org.hamcrest.CustomTypeSafeMatcher; import org.junit.Before; import org.mockito.ArgumentMatcher; @@ -88,6 +88,9 @@ import static java.util.Collections.emptySet; import static org.hamcrest.Matchers.containsString; import static org.elasticsearch.core.Tuple.tuple; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; @@ -376,6 +379,82 @@ public void testGetProcessorsInPipeline() throws Exception { assertThat("pipeline with id [fakeID] does not exist", equalTo(e.getMessage())); } + public void testGetPipelineWithProcessorType() throws Exception { + IngestService ingestService = createWithProcessors(); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + ClusterState previousClusterState = clusterState; + + PutPipelineRequest putRequest1 = new PutPipelineRequest("_id1", new BytesArray( + "{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\", \"tag\": \"tag1\"}}," + + "{\"remove\" : {\"field\": \"_field\", \"tag\": \"tag2\"}}]}"), + XContentType.JSON); + clusterState = IngestService.innerPut(putRequest1, clusterState); + PutPipelineRequest putRequest2 = new PutPipelineRequest("_id2", new BytesArray( + "{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\", \"tag\": \"tag2\"}}]}"), + XContentType.JSON); + clusterState = IngestService.innerPut(putRequest2, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + + assertThat(ingestService.getPipelineWithProcessorType(FakeProcessor.class, processor -> true), containsInAnyOrder("_id1", "_id2")); + assertThat(ingestService.getPipelineWithProcessorType(FakeProcessor.class, processor -> false), emptyIterable()); + assertThat(ingestService.getPipelineWithProcessorType(WrappingProcessorImpl.class, processor -> true), containsInAnyOrder("_id1")); + } + + public void testReloadPipeline() throws Exception { + boolean[] externalProperty = new boolean[] {false}; + + Map processorFactories = new HashMap<>(); + processorFactories.put("set", (factories, tag, description, config) -> { + String field = (String) config.remove("field"); + String value = (String) config.remove("value"); + if (externalProperty[0]) { + return new FakeProcessor("set", tag, description, (ingestDocument) ->ingestDocument.setFieldValue(field, value)); + } else { + return new AbstractProcessor(tag, description) { + @Override + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + throw new RuntimeException("reload me"); + } + + @Override + public String getType() { + return "set"; + } + }; + } + }); + + IngestService ingestService = createWithProcessors(processorFactories); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + ClusterState previousClusterState = clusterState; + + PutPipelineRequest putRequest1 = new PutPipelineRequest("_id1", new BytesArray( + "{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\", \"tag\": \"tag1\"}}]}"), + XContentType.JSON); + clusterState = IngestService.innerPut(putRequest1, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + + { + Exception[] exceptionHolder = new Exception[1]; + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); + ingestService.getPipeline("_id1").execute(ingestDocument, (ingestDocument1, e) -> exceptionHolder[0] = e); + assertThat(exceptionHolder[0], notNullValue()); + assertThat(exceptionHolder[0].getMessage(), containsString("reload me")); + assertThat(ingestDocument.getSourceAndMetadata().get("_field"), nullValue()); + } + + externalProperty[0] = true; + ingestService.reloadPipeline("_id1"); + + { + Exception[] holder = new Exception[1]; + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); + ingestService.getPipeline("_id1").execute(ingestDocument, (ingestDocument1, e) -> holder[0] = e); + assertThat(holder[0], nullValue()); + assertThat(ingestDocument.getSourceAndMetadata().get("_field"), equalTo("_value")); + } + } + public void testGetProcessorsInPipelineComplexConditional() throws Exception { LongSupplier relativeTimeProvider = mock(LongSupplier.class); String scriptName = "conditionalScript";