Skip to content

Commit

Permalink
Make GeoIpProcessor backing database instance pluggable. (#93285)
Browse files Browse the repository at this point in the history
Introduces two new interfaces: GeoIpDatabase and GeoIpDatabaseProvider. GeoIpDatabaseProvider 
acts as a generic factory interface for GeoIpDatabase instances. This allows for specifying how 
database instances are obtained to the processor. GeoIpDatabase encompasses the API footprint for
performing GeoIp lookups against a maxmind database.

---------

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
  • Loading branch information
jbaiera and elasticmachine committed Feb 2, 2023
1 parent 7f7cf30 commit c143caf
Show file tree
Hide file tree
Showing 11 changed files with 306 additions and 147 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/93285.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 93285
summary: Make `GeoIpProcessor` backing database instance pluggable
area: Ingest Node
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ public class ReloadingDatabasesWhilePerformingGeoLookupsIT extends ESTestCase {
public void test() throws Exception {
Path geoIpConfigDir = createTempDir();
Path geoIpTmpDir = createTempDir();
DatabaseNodeService databaseNodeService = createRegistry(geoIpConfigDir, geoIpTmpDir);
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE);
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseNodeService, clusterService);
DatabaseNodeService databaseNodeService = createRegistry(geoIpConfigDir, geoIpTmpDir, clusterService);
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseNodeService);
Files.copy(ConfigDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"), geoIpTmpDir.resolve("GeoLite2-City.mmdb"));
Files.copy(ConfigDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"), geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"));
databaseNodeService.updateDatabase("GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb"));
Expand Down Expand Up @@ -190,7 +190,8 @@ public void test() throws Exception {
IOUtils.rm(geoIpConfigDir, geoIpTmpDir);
}

private static DatabaseNodeService createRegistry(Path geoIpConfigDir, Path geoIpTmpDir) throws IOException {
private static DatabaseNodeService createRegistry(Path geoIpConfigDir, Path geoIpTmpDir, ClusterService clusterService)
throws IOException {
GeoIpCache cache = new GeoIpCache(0);
ConfigDatabases configDatabases = new ConfigDatabases(geoIpConfigDir, cache);
copyDatabaseFiles(geoIpConfigDir, configDatabases);
Expand All @@ -199,9 +200,10 @@ private static DatabaseNodeService createRegistry(Path geoIpConfigDir, Path geoI
mock(Client.class),
cache,
configDatabases,
Runnable::run
Runnable::run,
clusterService
);
databaseNodeService.initialize("nodeId", mock(ResourceWatcherService.class), mock(IngestService.class), mock(ClusterService.class));
databaseNodeService.initialize("nodeId", mock(ResourceWatcherService.class), mock(IngestService.class));
return databaseNodeService;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.hash.MessageDigests;
import org.elasticsearch.common.logging.HeaderWarning;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.CheckedRunnable;
import org.elasticsearch.core.IOUtils;
Expand Down Expand Up @@ -63,6 +64,7 @@
import java.util.zip.GZIPInputStream;

import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.persistent.PersistentTasksCustomMetadata.getTaskWithId;

/**
* A component that is responsible for making the databases maintained by {@link GeoIpDownloader}
Expand All @@ -77,13 +79,13 @@
* 2) For each database check whether the databases have changed
* by comparing the local and remote md5 hash or are locally missing.
* 3) For each database identified in step 2 start downloading the database
* chunks. Each chunks is appended to a tmp file (inside geoip tmp dir) and
* chunks. Each chunk is appended to a tmp file (inside geoip tmp dir) and
* after all chunks have been downloaded, the database is uncompressed and
* renamed to the final filename.After this the database is loaded and
* if there is an old instance of this database then that is closed.
* 4) Cleanup locally loaded databases that are no longer mentioned in {@link GeoIpTaskState}.
*/
public final class DatabaseNodeService implements Closeable {
public final class DatabaseNodeService implements GeoIpDatabaseProvider, Closeable {

private static final Logger LOGGER = LogManager.getLogger(DatabaseNodeService.class);

Expand All @@ -93,34 +95,45 @@ public final class DatabaseNodeService implements Closeable {
private Path geoipTmpDirectory;
private final ConfigDatabases configDatabases;
private final Consumer<Runnable> genericExecutor;
private final ClusterService clusterService;
private IngestService ingestService;

private final ConcurrentMap<String, DatabaseReaderLazyLoader> databases = new ConcurrentHashMap<>();

DatabaseNodeService(Environment environment, Client client, GeoIpCache cache, Consumer<Runnable> genericExecutor) {
DatabaseNodeService(
Environment environment,
Client client,
GeoIpCache cache,
Consumer<Runnable> genericExecutor,
ClusterService clusterService
) {
this(
environment.tmpFile(),
new OriginSettingClient(client, IngestService.INGEST_ORIGIN),
cache,
new ConfigDatabases(environment, cache),
genericExecutor
genericExecutor,
clusterService
);
}

DatabaseNodeService(Path tmpDir, Client client, GeoIpCache cache, ConfigDatabases configDatabases, Consumer<Runnable> genericExecutor) {
DatabaseNodeService(
Path tmpDir,
Client client,
GeoIpCache cache,
ConfigDatabases configDatabases,
Consumer<Runnable> genericExecutor,
ClusterService clusterService
) {
this.client = client;
this.cache = cache;
this.geoipTmpBaseDirectory = tmpDir.resolve("geoip-databases");
this.configDatabases = configDatabases;
this.genericExecutor = genericExecutor;
this.clusterService = clusterService;
}

public void initialize(
String nodeId,
ResourceWatcherService resourceWatcher,
IngestService ingestServiceArg,
ClusterService clusterService
) throws IOException {
public void initialize(String nodeId, ResourceWatcherService resourceWatcher, IngestService ingestServiceArg) throws IOException {
configDatabases.initialize(resourceWatcher);
geoipTmpDirectory = geoipTmpBaseDirectory.resolve(nodeId);
Files.walkFileTree(geoipTmpDirectory, new FileVisitor<>() {
Expand Down Expand Up @@ -161,7 +174,35 @@ public FileVisitResult postVisitDirectory(Path dir, IOException exc) {
clusterService.addListener(event -> checkDatabases(event.state()));
}

public DatabaseReaderLazyLoader getDatabase(String name) {
@Override
public Boolean isValid(String databaseFile) {
ClusterState currentState = clusterService.state();
assert currentState != null;

PersistentTasksCustomMetadata.PersistentTask<?> task = getTaskWithId(currentState, GeoIpDownloader.GEOIP_DOWNLOADER);
if (task == null || task.getState() == null) {
return true;
}
GeoIpTaskState state = (GeoIpTaskState) task.getState();
GeoIpTaskState.Metadata metadata = state.getDatabases().get(databaseFile);
// we never remove metadata from cluster state, if metadata is null we deal with built-in database, which is always valid
if (metadata == null) {
return true;
}

boolean valid = metadata.isValid(currentState.metadata().settings());
if (valid && metadata.isCloseToExpiration()) {
HeaderWarning.addWarning(
"database [{}] was not updated for over 25 days, geoip processor" + " will stop working if there is no update for 30 days",
databaseFile
);
}

return valid;
}

// for testing only:
DatabaseReaderLazyLoader getDatabaseReaderLazyLoader(String name) {
// There is a need for reference counting in order to avoid using an instance
// that gets closed while using it. (this can happen during a database update)
while (true) {
Expand All @@ -174,6 +215,11 @@ public DatabaseReaderLazyLoader getDatabase(String name) {
}
}

@Override
public GeoIpDatabase getDatabase(String name) {
return getDatabaseReaderLazyLoader(name);
}

List<DatabaseReaderLazyLoader> getAllDatabases() {
List<DatabaseReaderLazyLoader> all = new ArrayList<>(configDatabases.getConfigDatabases().values());
this.databases.forEach((key, value) -> all.add(value));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
* Facilitates lazy loading of the database reader, so that when the geoip plugin is installed, but not used,
* no memory is being wasted on the database reader.
*/
class DatabaseReaderLazyLoader implements Closeable {
class DatabaseReaderLazyLoader implements GeoIpDatabase, Closeable {

private static final boolean LOAD_DATABASE_ON_HEAP = Booleans.parseBoolean(System.getProperty("es.geoip.load_db_on_heap", "false"));

Expand Down Expand Up @@ -81,7 +81,8 @@ class DatabaseReaderLazyLoader implements Closeable {
* @return the database type
* @throws IOException if an I/O exception occurs reading the database type
*/
final String getDatabaseType() throws IOException {
@Override
public final String getDatabaseType() throws IOException {
if (databaseType.get() == null) {
synchronized (databaseType) {
if (databaseType.get() == null) {
Expand Down Expand Up @@ -151,25 +152,29 @@ InputStream databaseInputStream() throws IOException {
}

@Nullable
CityResponse getCity(InetAddress ipAddress) {
@Override
public CityResponse getCity(InetAddress ipAddress) {
return getResponse(ipAddress, DatabaseReader::tryCity);
}

@Nullable
CountryResponse getCountry(InetAddress ipAddress) {
@Override
public CountryResponse getCountry(InetAddress ipAddress) {
return getResponse(ipAddress, DatabaseReader::tryCountry);
}

@Nullable
AsnResponse getAsn(InetAddress ipAddress) {
@Override
public AsnResponse getAsn(InetAddress ipAddress) {
return getResponse(ipAddress, DatabaseReader::tryAsn);
}

boolean preLookup() {
return currentUsages.updateAndGet(current -> current < 0 ? current : current + 1) > 0;
}

void postLookup() throws IOException {
@Override
public void release() throws IOException {
if (currentUsages.updateAndGet(current -> current > 0 ? current - 1 : current + 1) == -1) {
doClose();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.ingest.geoip;

import com.maxmind.geoip2.model.AsnResponse;
import com.maxmind.geoip2.model.CityResponse;
import com.maxmind.geoip2.model.CountryResponse;

import org.elasticsearch.core.Nullable;

import java.io.IOException;
import java.net.InetAddress;

/**
* Provides a uniform interface for interacting with various GeoIP databases.
*/
public interface GeoIpDatabase {

/**
* @return the database type as it is detailed in the database file metadata
* @throws IOException if the database file could not be read or the data could not be accessed
*/
String getDatabaseType() throws IOException;

/**
* @param ipAddress the IP address to look up
* @return a response containing the city data for the given address if it exists, or <code>null</code> if it could not be found
* @throws UnsupportedOperationException may be thrown if the implementation does not support retrieving city data
*/
@Nullable
CityResponse getCity(InetAddress ipAddress);

/**
* @param ipAddress the IP address to look up
* @return a response containing the country data for the given address if it exists, or <code>null</code> if it could not be found
* @throws UnsupportedOperationException may be thrown if the implementation does not support retrieving country data
*/
@Nullable
CountryResponse getCountry(InetAddress ipAddress);

/**
* @param ipAddress the IP address to look up
* @return a response containing the Autonomous System Number for the given address if it exists, or <code>null</code> if it could not
* be found
* @throws UnsupportedOperationException may be thrown if the implementation does not support retrieving ASN data
*/
@Nullable
AsnResponse getAsn(InetAddress ipAddress);

/**
* Releases the current database object. Called after processing a single document. Databases should be closed or returned to a
* resource pool. No further interactions should be expected.
* @throws IOException if the implementation encounters any problem while cleaning up
*/
void release() throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.ingest.geoip;

/**
* Provides construction and initialization logic for {@link GeoIpDatabase} instances.
*/
public interface GeoIpDatabaseProvider {

/**
* Determines if the given database name corresponds to an expired database. Expired databases will not be loaded.
* <br/><br/>
* Verifying database expiration is left to each provider implementation to determine. A return value of <code>false</code> does not
* preclude the possibility of a provider returning <code>true</code> in the future.
*
* @param name the name of the database to provide.
* @return <code>false</code> IFF the requested database file is expired,
* <code>true</code> for all other cases (including unknown file name, file missing, wrong database type, etc).
*/
Boolean isValid(String name);

/**
* @param name the name of the database to provide. Default database names that should always be supported are listed in
* {@link IngestGeoIpPlugin#DEFAULT_DATABASE_FILENAMES}.
* @return a ready-to-use database instance, or <code>null</code> if no database could be loaded.
*/
GeoIpDatabase getDatabase(String name);
}

0 comments on commit c143caf

Please sign in to comment.