Skip to content

Commit

Permalink
Rename internal geoip components. (#79238)
Browse files Browse the repository at this point in the history
Rename LocalDatabases to ConfigDatabases and
DatabaseRegistry to DatabaseNodeService. I
think these names better reflect what these
components are doing.
  • Loading branch information
martijnvg committed Oct 27, 2021
1 parent 036f881 commit 0e404aa
Show file tree
Hide file tree
Showing 9 changed files with 178 additions and 178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,17 @@ public class ReloadingDatabasesWhilePerformingGeoLookupsIT extends ESTestCase {
public void test() throws Exception {
Path geoIpConfigDir = createTempDir();
Path geoIpTmpDir = createTempDir();
DatabaseRegistry databaseRegistry = createRegistry(geoIpConfigDir, geoIpTmpDir);
DatabaseNodeService databaseNodeService = createRegistry(geoIpConfigDir, geoIpTmpDir);
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE);
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService);
Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"),
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseNodeService, clusterService);
Files.copy(ConfigDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"),
geoIpTmpDir.resolve("GeoLite2-City.mmdb"));
Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"),
Files.copy(ConfigDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"),
geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"));
databaseRegistry.updateDatabase("GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb"));
databaseRegistry.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"));
lazyLoadReaders(databaseRegistry);
databaseNodeService.updateDatabase("GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb"));
databaseNodeService.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"));
lazyLoadReaders(databaseNodeService);

final GeoIpProcessor processor1 = (GeoIpProcessor) factory.create(null, "_tag", null, new HashMap<>(Map.of("field", "_field")));
final GeoIpProcessor processor2 = (GeoIpProcessor) factory.create(null, "_tag", null,
Expand Down Expand Up @@ -107,9 +107,9 @@ public void test() throws Exception {
Thread updateDatabaseThread = new Thread(() -> {
for (int i = 0; i < numberOfDatabaseUpdates; i++) {
try {
DatabaseReaderLazyLoader previous1 = databaseRegistry.get("GeoLite2-City.mmdb");
DatabaseReaderLazyLoader previous1 = databaseNodeService.get("GeoLite2-City.mmdb");
if (Files.exists(geoIpTmpDir.resolve("GeoLite2-City.mmdb"))) {
databaseRegistry.removeStaleEntries(List.of("GeoLite2-City.mmdb"));
databaseNodeService.removeStaleEntries(List.of("GeoLite2-City.mmdb"));
assertBusy(() -> {
// lazy loader may still be in use by an ingest thread,
// wait for any potential ingest thread to release the lazy loader (DatabaseReaderLazyLoader#postLookup(...)),
Expand All @@ -118,23 +118,23 @@ public void test() throws Exception {
assertThat(previous1.current(), equalTo(-1));
});
} else {
Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"),
Files.copy(ConfigDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"),
geoIpTmpDir.resolve("GeoLite2-City.mmdb"), StandardCopyOption.REPLACE_EXISTING);
databaseRegistry.updateDatabase("GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb"));
databaseNodeService.updateDatabase("GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb"));
}
DatabaseReaderLazyLoader previous2 = databaseRegistry.get("GeoLite2-City-Test.mmdb");
InputStream source = LocalDatabases.class.getResourceAsStream(i % 2 == 0 ? "/GeoIP2-City-Test.mmdb" :
DatabaseReaderLazyLoader previous2 = databaseNodeService.get("GeoLite2-City-Test.mmdb");
InputStream source = ConfigDatabases.class.getResourceAsStream(i % 2 == 0 ? "/GeoIP2-City-Test.mmdb" :
"/GeoLite2-City-Test.mmdb");
Files.copy(source, geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"), StandardCopyOption.REPLACE_EXISTING);
databaseRegistry.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"));
databaseNodeService.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"));

DatabaseReaderLazyLoader current1 = databaseRegistry.get("GeoLite2-City.mmdb");
DatabaseReaderLazyLoader current2 = databaseRegistry.get("GeoLite2-City-Test.mmdb");
DatabaseReaderLazyLoader current1 = databaseNodeService.get("GeoLite2-City.mmdb");
DatabaseReaderLazyLoader current2 = databaseNodeService.get("GeoLite2-City-Test.mmdb");
assertThat(current1, not(sameInstance(previous1)));
assertThat(current2, not(sameInstance(previous2)));

// lazy load type and reader:
lazyLoadReaders(databaseRegistry);
lazyLoadReaders(databaseNodeService);
} catch (Exception | AssertionError e) {
logger.error("error in update databases thread after run [" + i + "]", e);
failureHolder2.set(e);
Expand All @@ -159,30 +159,30 @@ public void test() throws Exception {
assertThat(failureHolder2.get(), nullValue());
assertThat(numberOfIngestRuns.get(), greaterThan(0));

for (DatabaseReaderLazyLoader lazyLoader : databaseRegistry.getAllDatabases()) {
for (DatabaseReaderLazyLoader lazyLoader : databaseNodeService.getAllDatabases()) {
assertThat(lazyLoader.current(), equalTo(0));
}
// Avoid accumulating many temp dirs while running with -Dtests.iters=X
IOUtils.rm(geoIpConfigDir, geoIpTmpDir);
}

private static DatabaseRegistry createRegistry(Path geoIpConfigDir, Path geoIpTmpDir) throws IOException {
private static DatabaseNodeService createRegistry(Path geoIpConfigDir, Path geoIpTmpDir) throws IOException {
GeoIpCache cache = new GeoIpCache(0);
LocalDatabases localDatabases = new LocalDatabases(geoIpConfigDir, cache);
copyDatabaseFiles(geoIpConfigDir, localDatabases);
DatabaseRegistry databaseRegistry =
new DatabaseRegistry(geoIpTmpDir, mock(Client.class), cache, localDatabases, Runnable::run);
databaseRegistry.initialize("nodeId", mock(ResourceWatcherService.class), mock(IngestService.class));
return databaseRegistry;
ConfigDatabases configDatabases = new ConfigDatabases(geoIpConfigDir, cache);
copyDatabaseFiles(geoIpConfigDir, configDatabases);
DatabaseNodeService databaseNodeService =
new DatabaseNodeService(geoIpTmpDir, mock(Client.class), cache, configDatabases, Runnable::run);
databaseNodeService.initialize("nodeId", mock(ResourceWatcherService.class), mock(IngestService.class));
return databaseNodeService;
}

private static void lazyLoadReaders(DatabaseRegistry databaseRegistry) throws IOException {
if (databaseRegistry.get("GeoLite2-City.mmdb") != null) {
databaseRegistry.get("GeoLite2-City.mmdb").getDatabaseType();
databaseRegistry.get("GeoLite2-City.mmdb").getCity(InetAddresses.forString("2.125.160.216"));
private static void lazyLoadReaders(DatabaseNodeService databaseNodeService) throws IOException {
if (databaseNodeService.get("GeoLite2-City.mmdb") != null) {
databaseNodeService.get("GeoLite2-City.mmdb").getDatabaseType();
databaseNodeService.get("GeoLite2-City.mmdb").getCity(InetAddresses.forString("2.125.160.216"));
}
databaseRegistry.get("GeoLite2-City-Test.mmdb").getDatabaseType();
databaseRegistry.get("GeoLite2-City-Test.mmdb").getCity(InetAddresses.forString("2.125.160.216"));
databaseNodeService.get("GeoLite2-City-Test.mmdb").getDatabaseType();
databaseNodeService.get("GeoLite2-City-Test.mmdb").getCity(InetAddresses.forString("2.125.160.216"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,20 @@
* Keeps track of user provided databases in the ES_HOME/config/ingest-geoip directory.
* This directory is monitored and files updates are picked up and may cause databases being loaded or removed at runtime.
*/
final class LocalDatabases implements Closeable {
final class ConfigDatabases implements Closeable {

private static final Logger LOGGER = LogManager.getLogger(LocalDatabases.class);
private static final Logger LOGGER = LogManager.getLogger(ConfigDatabases.class);

private final GeoIpCache cache;
private final Path geoipConfigDir;

private final ConcurrentMap<String, DatabaseReaderLazyLoader> configDatabases;

LocalDatabases(Environment environment, GeoIpCache cache) {
ConfigDatabases(Environment environment, GeoIpCache cache) {
this(environment.configFile().resolve("ingest-geoip"), cache);
}

LocalDatabases(Path geoipConfigDir, GeoIpCache cache) {
ConfigDatabases(Path geoipConfigDir, GeoIpCache cache) {
this.cache = cache;
this.geoipConfigDir = geoipConfigDir;
this.configDatabases = new ConcurrentHashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@

/**
* A component that is responsible for making the databases maintained by {@link GeoIpDownloader}
* available for ingest processors.
* available to ingest processors on each ingest node.
* <p>
* Also provided a lookup mechanism for geoip processors with fallback to {@link LocalDatabases}.
* Also provides a lookup mechanism for geoip processors with fallback to {@link ConfigDatabases}.
* All databases are downloaded into a geoip tmp directory, which is created at node startup.
* <p>
* The following high level steps are executed after each cluster state update:
Expand All @@ -78,44 +78,44 @@
* if there is an old instance of this database then that is closed.
* 4) Cleanup locally loaded databases that are no longer mentioned in {@link GeoIpTaskState}.
*/
public final class DatabaseRegistry implements Closeable {
public final class DatabaseNodeService implements Closeable {

private static final Logger LOGGER = LogManager.getLogger(DatabaseRegistry.class);
private static final Logger LOGGER = LogManager.getLogger(DatabaseNodeService.class);

private final Client client;
private final GeoIpCache cache;
private final Path geoipTmpBaseDirectory;
private Path geoipTmpDirectory;
private final LocalDatabases localDatabases;
private final ConfigDatabases configDatabases;
private final Consumer<Runnable> genericExecutor;
private IngestService ingestService;

private final ConcurrentMap<String, DatabaseReaderLazyLoader> databases = new ConcurrentHashMap<>();

DatabaseRegistry(Environment environment, Client client, GeoIpCache cache, Consumer<Runnable> genericExecutor) {
DatabaseNodeService(Environment environment, Client client, GeoIpCache cache, Consumer<Runnable> genericExecutor) {
this(
environment.tmpFile(),
new OriginSettingClient(client, IngestService.INGEST_ORIGIN),
cache,
new LocalDatabases(environment, cache),
new ConfigDatabases(environment, cache),
genericExecutor
);
}

DatabaseRegistry(Path tmpDir,
Client client,
GeoIpCache cache,
LocalDatabases localDatabases,
Consumer<Runnable> genericExecutor) {
DatabaseNodeService(Path tmpDir,
Client client,
GeoIpCache cache,
ConfigDatabases configDatabases,
Consumer<Runnable> genericExecutor) {
this.client = client;
this.cache = cache;
this.geoipTmpBaseDirectory = tmpDir.resolve("geoip-databases");
this.localDatabases = localDatabases;
this.configDatabases = configDatabases;
this.genericExecutor = genericExecutor;
}

public void initialize(String nodeId, ResourceWatcherService resourceWatcher, IngestService ingestService) throws IOException {
localDatabases.initialize(resourceWatcher);
configDatabases.initialize(resourceWatcher);
geoipTmpDirectory = geoipTmpBaseDirectory.resolve(nodeId);
Files.walkFileTree(geoipTmpDirectory, new FileVisitor<>() {
@Override
Expand Down Expand Up @@ -160,7 +160,7 @@ public DatabaseReaderLazyLoader getDatabase(String name) {
// that gets closed while using it. (this can happen during a database update)
while (true) {
DatabaseReaderLazyLoader instance =
databases.getOrDefault(name, localDatabases.getDatabase(name));
databases.getOrDefault(name, configDatabases.getDatabase(name));
if (instance == null || instance.preLookup()) {
return instance;
}
Expand All @@ -170,7 +170,7 @@ public DatabaseReaderLazyLoader getDatabase(String name) {
}

List<DatabaseReaderLazyLoader> getAllDatabases() {
List<DatabaseReaderLazyLoader> all = new ArrayList<>(localDatabases.getConfigDatabases().values());
List<DatabaseReaderLazyLoader> all = new ArrayList<>(configDatabases.getConfigDatabases().values());
this.databases.forEach((key, value) -> all.add(value));
return all;
}
Expand Down Expand Up @@ -404,7 +404,7 @@ public Set<String> getAvailableDatabases() {
}

public Set<String> getConfigDatabases() {
return localDatabases.getConfigDatabases().keySet();
return configDatabases.getConfigDatabases().keySet();
}

public Set<String> getFilesInTemp() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,15 +370,15 @@ public static final class Factory implements Processor.Factory {
Property.IP, Property.ASN, Property.ORGANIZATION_NAME, Property.NETWORK
));

private final DatabaseRegistry databaseRegistry;
private final DatabaseNodeService databaseNodeService;
private final ClusterService clusterService;

List<DatabaseReaderLazyLoader> getAllDatabases() {
return databaseRegistry.getAllDatabases();
return databaseNodeService.getAllDatabases();
}

public Factory(DatabaseRegistry databaseRegistry, ClusterService clusterService) {
this.databaseRegistry = databaseRegistry;
public Factory(DatabaseNodeService databaseNodeService, ClusterService clusterService) {
this.databaseNodeService = databaseNodeService;
this.clusterService = clusterService;
}

Expand All @@ -400,8 +400,8 @@ public Processor create(
DEPRECATION_LOGGER.critical(DeprecationCategory.OTHER, "default_databases_message", DEFAULT_DATABASES_DEPRECATION_MESSAGE);
}

DatabaseReaderLazyLoader lazyLoader = databaseRegistry.getDatabase(databaseFile);
if (useDatabaseUnavailableProcessor(lazyLoader, databaseRegistry.getAvailableDatabases())) {
DatabaseReaderLazyLoader lazyLoader = databaseNodeService.getDatabase(databaseFile);
if (useDatabaseUnavailableProcessor(lazyLoader, databaseNodeService.getAvailableDatabases())) {
return new DatabaseUnavailableProcessor(processorTag, description, databaseFile);
} else if (lazyLoader == null) {
throw newConfigurationException(TYPE, processorTag,
Expand Down Expand Up @@ -438,8 +438,8 @@ public Processor create(
}
}
CheckedSupplier<DatabaseReaderLazyLoader, IOException> supplier = () -> {
DatabaseReaderLazyLoader loader = databaseRegistry.getDatabase(databaseFile);
if (useDatabaseUnavailableProcessor(loader, databaseRegistry.getAvailableDatabases())) {
DatabaseReaderLazyLoader loader = databaseNodeService.getDatabase(databaseFile);
if (useDatabaseUnavailableProcessor(loader, databaseNodeService.getAvailableDatabases())) {
return null;
} else if (loader == null) {
throw new ResourceNotFoundException("database file [" + databaseFile + "] doesn't exist");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, SystemInd
static String[] DEFAULT_DATABASE_FILENAMES = new String[]{"GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb"};

private final SetOnce<IngestService> ingestService = new SetOnce<>();
private final SetOnce<DatabaseRegistry> databaseRegistry = new SetOnce<>();
private final SetOnce<DatabaseNodeService> databaseRegistry = new SetOnce<>();
private GeoIpDownloaderTaskExecutor geoIpDownloaderTaskExecutor;

@Override
Expand All @@ -91,7 +91,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet

long cacheSize = CACHE_SIZE.get(parameters.env.settings());
GeoIpCache geoIpCache = new GeoIpCache(cacheSize);
DatabaseRegistry registry = new DatabaseRegistry(parameters.env, parameters.client, geoIpCache, parameters.genericExecutor);
DatabaseNodeService registry = new DatabaseNodeService(parameters.env, parameters.client, geoIpCache, parameters.genericExecutor);
databaseRegistry.set(registry);
return Map.of(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory(registry, parameters.ingestService.getClusterService()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.ingest.geoip.DatabaseRegistry;
import org.elasticsearch.ingest.geoip.DatabaseNodeService;
import org.elasticsearch.ingest.geoip.GeoIpDownloader;
import org.elasticsearch.ingest.geoip.GeoIpDownloaderTaskExecutor;
import org.elasticsearch.ingest.geoip.stats.GeoIpDownloaderStatsAction.NodeRequest;
Expand All @@ -33,12 +33,12 @@ public class GeoIpDownloaderStatsTransportAction extends TransportNodesAction<Re
Response, NodeRequest, NodeResponse> {

private final TransportService transportService;
private final DatabaseRegistry registry;
private final DatabaseNodeService registry;
private final GeoIpDownloaderTaskExecutor geoIpDownloaderTaskExecutor;

@Inject
public GeoIpDownloaderStatsTransportAction(TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters, DatabaseRegistry registry,
ThreadPool threadPool, ActionFilters actionFilters, DatabaseNodeService registry,
GeoIpDownloaderTaskExecutor geoIpDownloaderTaskExecutor) {
super(GeoIpDownloaderStatsAction.NAME, threadPool, clusterService, transportService, actionFilters, Request::new,
NodeRequest::new, ThreadPool.Names.MANAGEMENT, NodeResponse.class);
Expand Down

0 comments on commit 0e404aa

Please sign in to comment.