Skip to content

Commit

Permalink
Rename internal geoip components. (#79901)
Browse files Browse the repository at this point in the history
Backporting #79238 to 7.16 branch.

Rename LocalDatabases to ConfigDatabases and
DatabaseRegistry to DatabaseNodeService. I
think these names better reflect what these
components are doing.
  • Loading branch information
martijnvg committed Oct 27, 2021
1 parent cb35978 commit d259080
Show file tree
Hide file tree
Showing 9 changed files with 171 additions and 167 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,17 @@ public void test() throws Exception {
Path geoIpModulesDir = createTempDir();
Path geoIpConfigDir = createTempDir();
Path geoIpTmpDir = createTempDir();
DatabaseRegistry databaseRegistry = createRegistry(geoIpModulesDir, geoIpConfigDir, geoIpTmpDir);
DatabaseNodeService databaseNodeService = createRegistry(geoIpModulesDir, geoIpConfigDir, geoIpTmpDir);
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE);
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService);
Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"),
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseNodeService, clusterService);
Files.copy(ConfigDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"),
geoIpTmpDir.resolve("GeoLite2-City.mmdb"));
Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"),
Files.copy(ConfigDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"),
geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"));
databaseRegistry.updateDatabase("GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb"));
databaseRegistry.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"));
lazyLoadReaders(databaseRegistry);
databaseNodeService.updateDatabase("GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb"));
databaseNodeService.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"));
lazyLoadReaders(databaseNodeService);

final GeoIpProcessor processor1 = factory.create(null, "_tag", null, new HashMap<>(Map.of("field", "_field")));
final GeoIpProcessor processor2 = factory.create(null, "_tag", null,
Expand Down Expand Up @@ -108,9 +108,9 @@ public void test() throws Exception {
Thread updateDatabaseThread = new Thread(() -> {
for (int i = 0; i < numberOfDatabaseUpdates; i++) {
try {
DatabaseReaderLazyLoader previous1 = databaseRegistry.get("GeoLite2-City.mmdb");
DatabaseReaderLazyLoader previous1 = databaseNodeService.get("GeoLite2-City.mmdb");
if (Files.exists(geoIpTmpDir.resolve("GeoLite2-City.mmdb"))) {
databaseRegistry.removeStaleEntries(List.of("GeoLite2-City.mmdb"));
databaseNodeService.removeStaleEntries(List.of("GeoLite2-City.mmdb"));
assertBusy(() -> {
// lazy loader may still be in use by an ingest thread,
// wait for any potential ingest thread to release the lazy loader (DatabaseReaderLazyLoader#postLookup(...)),
Expand All @@ -119,23 +119,23 @@ public void test() throws Exception {
assertThat(previous1.current(), equalTo(-1));
});
} else {
Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"),
Files.copy(ConfigDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"),
geoIpTmpDir.resolve("GeoLite2-City.mmdb"), StandardCopyOption.REPLACE_EXISTING);
databaseRegistry.updateDatabase("GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb"));
databaseNodeService.updateDatabase("GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb"));
}
DatabaseReaderLazyLoader previous2 = databaseRegistry.get("GeoLite2-City-Test.mmdb");
InputStream source = LocalDatabases.class.getResourceAsStream(i % 2 == 0 ? "/GeoIP2-City-Test.mmdb" :
DatabaseReaderLazyLoader previous2 = databaseNodeService.get("GeoLite2-City-Test.mmdb");
InputStream source = ConfigDatabases.class.getResourceAsStream(i % 2 == 0 ? "/GeoIP2-City-Test.mmdb" :
"/GeoLite2-City-Test.mmdb");
Files.copy(source, geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"), StandardCopyOption.REPLACE_EXISTING);
databaseRegistry.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"));
databaseNodeService.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"));

DatabaseReaderLazyLoader current1 = databaseRegistry.get("GeoLite2-City.mmdb");
DatabaseReaderLazyLoader current2 = databaseRegistry.get("GeoLite2-City-Test.mmdb");
DatabaseReaderLazyLoader current1 = databaseNodeService.get("GeoLite2-City.mmdb");
DatabaseReaderLazyLoader current2 = databaseNodeService.get("GeoLite2-City-Test.mmdb");
assertThat(current1, not(sameInstance(previous1)));
assertThat(current2, not(sameInstance(previous2)));

// lazy load type and reader:
lazyLoadReaders(databaseRegistry);
lazyLoadReaders(databaseNodeService);
} catch (Exception | AssertionError e) {
logger.error("error in update databases thread after run [" + i + "]", e);
failureHolder2.set(e);
Expand All @@ -160,30 +160,30 @@ public void test() throws Exception {
assertThat(failureHolder2.get(), nullValue());
assertThat(numberOfIngestRuns.get(), greaterThan(0));

for (DatabaseReaderLazyLoader lazyLoader : databaseRegistry.getAllDatabases()) {
for (DatabaseReaderLazyLoader lazyLoader : databaseNodeService.getAllDatabases()) {
assertThat(lazyLoader.current(), equalTo(0));
}
// Avoid accumulating many temp dirs while running with -Dtests.iters=X
IOUtils.rm(geoIpModulesDir, geoIpConfigDir, geoIpTmpDir);
}

private static DatabaseRegistry createRegistry(Path geoIpModulesDir, Path geoIpConfigDir, Path geoIpTmpDir) throws IOException {
private static DatabaseNodeService createRegistry(Path geoIpModulesDir, Path geoIpConfigDir, Path geoIpTmpDir) throws IOException {
copyDatabaseFiles(geoIpModulesDir);
GeoIpCache cache = new GeoIpCache(0);
LocalDatabases localDatabases = new LocalDatabases(geoIpModulesDir, geoIpConfigDir, cache);
DatabaseRegistry databaseRegistry =
new DatabaseRegistry(geoIpTmpDir, mock(Client.class), cache, localDatabases, Runnable::run);
databaseRegistry.initialize("nodeId", mock(ResourceWatcherService.class), mock(IngestService.class));
return databaseRegistry;
ConfigDatabases localDatabases = new ConfigDatabases(geoIpModulesDir, geoIpConfigDir, cache);
DatabaseNodeService databaseNodeService =
new DatabaseNodeService(geoIpTmpDir, mock(Client.class), cache, localDatabases, Runnable::run);
databaseNodeService.initialize("nodeId", mock(ResourceWatcherService.class), mock(IngestService.class));
return databaseNodeService;
}

private static void lazyLoadReaders(DatabaseRegistry databaseRegistry) throws IOException {
if (databaseRegistry.get("GeoLite2-City.mmdb") != null) {
databaseRegistry.get("GeoLite2-City.mmdb").getDatabaseType();
databaseRegistry.get("GeoLite2-City.mmdb").getCity(InetAddresses.forString("2.125.160.216"));
private static void lazyLoadReaders(DatabaseNodeService databaseNodeService) throws IOException {
if (databaseNodeService.get("GeoLite2-City.mmdb") != null) {
databaseNodeService.get("GeoLite2-City.mmdb").getDatabaseType();
databaseNodeService.get("GeoLite2-City.mmdb").getCity(InetAddresses.forString("2.125.160.216"));
}
databaseRegistry.get("GeoLite2-City-Test.mmdb").getDatabaseType();
databaseRegistry.get("GeoLite2-City-Test.mmdb").getCity(InetAddresses.forString("2.125.160.216"));
databaseNodeService.get("GeoLite2-City-Test.mmdb").getDatabaseType();
databaseNodeService.get("GeoLite2-City-Test.mmdb").getCity(InetAddresses.forString("2.125.160.216"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,17 @@
* 2) User provided databases from the ES_HOME/config/ingest-geoip directory. This directory is monitored
* and files updates are picked up and may cause databases being loaded or removed at runtime.
*/
final class LocalDatabases implements Closeable {
final class ConfigDatabases implements Closeable {

private static final Logger LOGGER = LogManager.getLogger(LocalDatabases.class);
private static final Logger LOGGER = LogManager.getLogger(ConfigDatabases.class);

private final GeoIpCache cache;
private final Path geoipConfigDir;

private final Map<String, DatabaseReaderLazyLoader> defaultDatabases;
private final ConcurrentMap<String, DatabaseReaderLazyLoader> configDatabases;

LocalDatabases(Environment environment, GeoIpCache cache) {
ConfigDatabases(Environment environment, GeoIpCache cache) {
this(
// In GeoIpProcessorNonIngestNodeTests, ingest-geoip is loaded on the classpath.
// This means that the plugin is never unbundled into a directory where the database files would live.
Expand All @@ -65,7 +65,7 @@ final class LocalDatabases implements Closeable {
cache);
}

LocalDatabases(Path geoipModuleDir, Path geoipConfigDir, GeoIpCache cache) {
ConfigDatabases(Path geoipModuleDir, Path geoipConfigDir, GeoIpCache cache) {
this.cache = cache;
this.geoipConfigDir = geoipConfigDir;
this.configDatabases = new ConcurrentHashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@

/**
* A component that is responsible for making the databases maintained by {@link GeoIpDownloader}
* available for ingest processors.
* available to ingest processors on each ingest node.
* <p>
* Also provided a lookup mechanism for geoip processors with fallback to {@link LocalDatabases}.
* Also provides a lookup mechanism for geoip processors with fallback to {@link ConfigDatabases}.
* All databases are downloaded into a geoip tmp directory, which is created at node startup.
* <p>
* The following high level steps are executed after each cluster state update:
Expand All @@ -77,43 +77,43 @@
* if there is an old instance of this database then that is closed.
* 4) Cleanup locally loaded databases that are no longer mentioned in {@link GeoIpTaskState}.
*/
public final class DatabaseRegistry implements Closeable {
public final class DatabaseNodeService implements Closeable {

private static final Logger LOGGER = LogManager.getLogger(DatabaseRegistry.class);
private static final Logger LOGGER = LogManager.getLogger(DatabaseNodeService.class);

private final Client client;
private final GeoIpCache cache;
private final Path geoipTmpBaseDirectory;
private Path geoipTmpDirectory;
private final LocalDatabases localDatabases;
private final ConfigDatabases configDatabases;
private final Consumer<Runnable> genericExecutor;

private final ConcurrentMap<String, DatabaseReaderLazyLoader> databases = new ConcurrentHashMap<>();

DatabaseRegistry(Environment environment, Client client, GeoIpCache cache, Consumer<Runnable> genericExecutor) {
DatabaseNodeService(Environment environment, Client client, GeoIpCache cache, Consumer<Runnable> genericExecutor) {
this(
environment.tmpFile(),
new OriginSettingClient(client, IngestService.INGEST_ORIGIN),
cache,
new LocalDatabases(environment, cache),
new ConfigDatabases(environment, cache),
genericExecutor
);
}

DatabaseRegistry(Path tmpDir,
Client client,
GeoIpCache cache,
LocalDatabases localDatabases,
Consumer<Runnable> genericExecutor) {
DatabaseNodeService(Path tmpDir,
Client client,
GeoIpCache cache,
ConfigDatabases configDatabases,
Consumer<Runnable> genericExecutor) {
this.client = client;
this.cache = cache;
this.geoipTmpBaseDirectory = tmpDir.resolve("geoip-databases");
this.localDatabases = localDatabases;
this.configDatabases = configDatabases;
this.genericExecutor = genericExecutor;
}

public void initialize(String nodeId, ResourceWatcherService resourceWatcher, IngestService ingestService) throws IOException {
localDatabases.initialize(resourceWatcher);
configDatabases.initialize(resourceWatcher);
geoipTmpDirectory = geoipTmpBaseDirectory.resolve(nodeId);
Files.walkFileTree(geoipTmpDirectory, new FileVisitor<Path>() {
@Override
Expand Down Expand Up @@ -157,7 +157,7 @@ public DatabaseReaderLazyLoader getDatabase(String name, boolean fallbackUsingDe
// that gets closed while using it. (this can happen during a database update)
while (true) {
DatabaseReaderLazyLoader instance =
databases.getOrDefault(name, localDatabases.getDatabase(name, fallbackUsingDefaultDatabases));
databases.getOrDefault(name, configDatabases.getDatabase(name, fallbackUsingDefaultDatabases));
if (instance == null || instance.preLookup()) {
return instance;
}
Expand All @@ -167,7 +167,7 @@ public DatabaseReaderLazyLoader getDatabase(String name, boolean fallbackUsingDe
}

List<DatabaseReaderLazyLoader> getAllDatabases() {
List<DatabaseReaderLazyLoader> all = new ArrayList<>(localDatabases.getAllDatabases());
List<DatabaseReaderLazyLoader> all = new ArrayList<>(configDatabases.getAllDatabases());
this.databases.forEach((key, value) -> all.add(value));
return all;
}
Expand Down Expand Up @@ -384,6 +384,10 @@ public Set<String> getAvailableDatabases() {
return org.elasticsearch.core.Set.copyOf(databases.keySet());
}

public Set<String> getConfigDatabases() {
return configDatabases.getConfigDatabases().keySet();
}

public Set<String> getFilesInTemp() {
try (Stream<Path> files = Files.list(geoipTmpDirectory)) {
return files.map(Path::getFileName).map(Path::toString).collect(Collectors.toSet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,15 +357,15 @@ public static final class Factory implements Processor.Factory {
Property.IP, Property.ASN, Property.ORGANIZATION_NAME, Property.NETWORK
));

private final DatabaseRegistry databaseRegistry;
private final DatabaseNodeService databaseNodeService;
private final ClusterService clusterService;

List<DatabaseReaderLazyLoader> getAllDatabases() {
return databaseRegistry.getAllDatabases();
return databaseNodeService.getAllDatabases();
}

public Factory(DatabaseRegistry databaseRegistry, ClusterService clusterService) {
this.databaseRegistry = databaseRegistry;
public Factory(DatabaseNodeService databaseNodeService, ClusterService clusterService) {
this.databaseNodeService = databaseNodeService;
this.clusterService = clusterService;
}

Expand All @@ -382,7 +382,7 @@ public GeoIpProcessor create(
boolean firstOnly = readBooleanProperty(TYPE, processorTag, config, "first_only", true);
boolean fallbackUsingDefaultDatabases = readBooleanProperty(TYPE, processorTag, config, "fallback_to_default_databases", true);

DatabaseReaderLazyLoader lazyLoader = databaseRegistry.getDatabase(databaseFile, fallbackUsingDefaultDatabases);
DatabaseReaderLazyLoader lazyLoader = databaseNodeService.getDatabase(databaseFile, fallbackUsingDefaultDatabases);
if (lazyLoader == null) {
throw newConfigurationException(TYPE, processorTag,
"database_file", "database file [" + databaseFile + "] doesn't exist");
Expand Down Expand Up @@ -418,7 +418,7 @@ public GeoIpProcessor create(
}
}
CheckedSupplier<DatabaseReaderLazyLoader, IOException> supplier = () -> {
DatabaseReaderLazyLoader loader = databaseRegistry.getDatabase(databaseFile, fallbackUsingDefaultDatabases);
DatabaseReaderLazyLoader loader = databaseNodeService.getDatabase(databaseFile, fallbackUsingDefaultDatabases);
if (loader == null) {
throw new ResourceNotFoundException("database file [" + databaseFile + "] doesn't exist");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, SystemInd
static String[] DEFAULT_DATABASE_FILENAMES = new String[]{"GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb"};

private final SetOnce<IngestService> ingestService = new SetOnce<>();
private final SetOnce<DatabaseRegistry> databaseRegistry = new SetOnce<>();
private final SetOnce<DatabaseNodeService> databaseRegistry = new SetOnce<>();
private GeoIpDownloaderTaskExecutor geoIpDownloaderTaskExecutor;

@Override
Expand All @@ -91,7 +91,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet

long cacheSize = CACHE_SIZE.get(parameters.env.settings());
GeoIpCache geoIpCache = new GeoIpCache(cacheSize);
DatabaseRegistry registry = new DatabaseRegistry(parameters.env, parameters.client, geoIpCache, parameters.genericExecutor);
DatabaseNodeService registry = new DatabaseNodeService(parameters.env, parameters.client, geoIpCache, parameters.genericExecutor);
databaseRegistry.set(registry);
return Collections.singletonMap(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory(registry,
parameters.ingestService.getClusterService()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.ingest.geoip.DatabaseRegistry;
import org.elasticsearch.ingest.geoip.DatabaseNodeService;
import org.elasticsearch.ingest.geoip.GeoIpDownloader;
import org.elasticsearch.ingest.geoip.GeoIpDownloaderTaskExecutor;
import org.elasticsearch.ingest.geoip.stats.GeoIpDownloaderStatsAction.NodeRequest;
Expand All @@ -31,12 +31,12 @@
public class GeoIpDownloaderStatsTransportAction extends TransportNodesAction<Request, Response, NodeRequest, NodeResponse> {

private final TransportService transportService;
private final DatabaseRegistry registry;
private final DatabaseNodeService registry;
private final GeoIpDownloaderTaskExecutor geoIpDownloaderTaskExecutor;

@Inject
public GeoIpDownloaderStatsTransportAction(TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters, DatabaseRegistry registry,
ThreadPool threadPool, ActionFilters actionFilters, DatabaseNodeService registry,
GeoIpDownloaderTaskExecutor geoIpDownloaderTaskExecutor) {
super(GeoIpDownloaderStatsAction.NAME, threadPool, clusterService, transportService, actionFilters, Request::new,
NodeRequest::new, ThreadPool.Names.MANAGEMENT, NodeResponse.class);
Expand Down

0 comments on commit d259080

Please sign in to comment.