Skip to content

Commit

Permalink
Return DatabaseUnavailableProcessor if builtin database is missing. (#…
Browse files Browse the repository at this point in the history
…79240)

If builtin database can't be loaded then assume it will be available soon
via database geoip downloading mechanism. So instead of returning a
config error, returns a geoip processor impl that tags documents with
the fact that a builtin db isn't yet avaialble.

Relates to #79074
  • Loading branch information
martijnvg committed Oct 27, 2021
1 parent 51e65b4 commit 4fdbadb
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ public Processor create(
}

DatabaseReaderLazyLoader lazyLoader = databaseNodeService.getDatabase(databaseFile);
if (useDatabaseUnavailableProcessor(lazyLoader, databaseNodeService.getAvailableDatabases())) {
if (useDatabaseUnavailableProcessor(lazyLoader, databaseFile)) {
return new DatabaseUnavailableProcessor(processorTag, description, databaseFile);
} else if (lazyLoader == null) {
throw newConfigurationException(TYPE, processorTag,
Expand Down Expand Up @@ -439,7 +439,7 @@ public Processor create(
}
CheckedSupplier<DatabaseReaderLazyLoader, IOException> supplier = () -> {
DatabaseReaderLazyLoader loader = databaseNodeService.getDatabase(databaseFile);
if (useDatabaseUnavailableProcessor(loader, databaseNodeService.getAvailableDatabases())) {
if (useDatabaseUnavailableProcessor(loader, databaseFile)) {
return null;
} else if (loader == null) {
throw new ResourceNotFoundException("database file [" + databaseFile + "] doesn't exist");
Expand Down Expand Up @@ -480,8 +480,12 @@ public Processor create(
firstOnly, databaseFile);
}

private static boolean useDatabaseUnavailableProcessor(DatabaseReaderLazyLoader loader, Set<String> availableDatabases) {
return loader == null && availableDatabases.isEmpty();
private static boolean useDatabaseUnavailableProcessor(DatabaseReaderLazyLoader loader, String databaseName) {
// If there is no loader for a database we should fail with a config error, but
// if there is no loader for a builtin database that we manage via GeoipDownloader then don't fail.
// In the latter case the database should become available at a later moment, so a processor impl
// is returned that tags documents instead.
return loader == null && IngestGeoIpPlugin.DEFAULT_DATABASE_FILENAMES.contains(databaseName);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;

import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
Expand All @@ -73,7 +74,7 @@ public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, SystemInd
public static final Setting<Long> CACHE_SIZE =
Setting.longSetting("ingest.geoip.cache_size", 1000, 0, Setting.Property.NodeScope);

static String[] DEFAULT_DATABASE_FILENAMES = new String[]{"GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb"};
static Set<String> DEFAULT_DATABASE_FILENAMES = Set.of("GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb");

private final SetOnce<IngestService> ingestService = new SetOnce<>();
private final SetOnce<DatabaseNodeService> databaseRegistry = new SetOnce<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import com.carrotsearch.randomizedtesting.generators.RandomPicks;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.Metadata;
Expand Down Expand Up @@ -47,6 +46,7 @@
import java.util.Set;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
Expand Down Expand Up @@ -218,12 +218,13 @@ public void testBuildNonExistingDbFile() throws Exception {
assertThat(e.getMessage(), equalTo("[database_file] database file [does-not-exist.mmdb] doesn't exist"));
}

public void testBuildNoDatabasesDownloaded() throws Exception {
public void testBuildBuiltinDatabaseMissing() throws Exception {
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseNodeService, clusterService);
cleanDatabaseFiles(geoIpConfigDir, configDatabases);

Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("database_file", "does-not-exist-yet.mmdb");
config.put("database_file", randomFrom(IngestGeoIpPlugin.DEFAULT_DATABASE_FILENAMES));
Processor processor = factory.create(null, null, null, config);
assertThat(processor, instanceOf(GeoIpProcessor.DatabaseUnavailableProcessor.class));
}
Expand Down Expand Up @@ -426,21 +427,22 @@ public void testUpdateDatabaseWhileIngesting() throws Exception {
assertThat(geoData, nullValue());
}
{
// There are database available, but not the right one, so fail:
// There are database available, but not the right one, so tag:
databaseNodeService.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoipTmpDir.resolve("GeoLite2-City-Test.mmdb"));
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
Exception e = expectThrows(ResourceNotFoundException.class, () -> processor.execute(ingestDocument));
assertThat(e.getMessage(), equalTo("database file [GeoLite2-City.mmdb] doesn't exist"));
processor.execute(ingestDocument);
assertThat(ingestDocument.getSourceAndMetadata(), hasEntry("tags", List.of("_geoip_database_unavailable_GeoLite2-City.mmdb")));
}
}

public void testDatabaseNotReadyYet() throws Exception {
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseNodeService, clusterService);
cleanDatabaseFiles(geoIpConfigDir, configDatabases);

{
Map<String, Object> config = new HashMap<>();
config.put("field", "source_field");
config.put("database_file", "GeoLite2-City-Test.mmdb");
config.put("database_file", "GeoLite2-City.mmdb");

Map<String, Object> document = new HashMap<>();
document.put("source_field", "89.160.20.128");
Expand All @@ -451,16 +453,16 @@ public void testDatabaseNotReadyYet() throws Exception {
processor.execute(ingestDocument);
assertThat(ingestDocument.getSourceAndMetadata().get("geoip"), nullValue());
assertThat(ingestDocument.getSourceAndMetadata().get("tags"),
equalTo(List.of("_geoip_database_unavailable_GeoLite2-City-Test.mmdb")));
equalTo(List.of("_geoip_database_unavailable_GeoLite2-City.mmdb")));
}

copyDatabaseFile(geoipTmpDir, "GeoLite2-City-Test.mmdb");
databaseNodeService.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoipTmpDir.resolve("GeoLite2-City-Test.mmdb"));
databaseNodeService.updateDatabase("GeoLite2-City.mmdb", "md5", geoipTmpDir.resolve("GeoLite2-City-Test.mmdb"));

{
Map<String, Object> config = new HashMap<>();
config.put("field", "source_field");
config.put("database_file", "GeoLite2-City-Test.mmdb");
config.put("database_file", "GeoLite2-City.mmdb");

Map<String, Object> document = new HashMap<>();
document.put("source_field", "89.160.20.128");
Expand Down Expand Up @@ -489,4 +491,10 @@ static void copyDatabaseFiles(final Path path, ConfigDatabases configDatabases)
}
}

static void cleanDatabaseFiles(final Path path, ConfigDatabases configDatabases) throws IOException {
for (final String databaseFilename : IngestGeoIpPlugin.DEFAULT_DATABASE_FILENAMES) {
configDatabases.updateDatabase(path.resolve(databaseFilename), false);
}
}

}

0 comments on commit 4fdbadb

Please sign in to comment.