Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the ingest-geoip databases even lazier to load #36679

Merged
merged 27 commits into from Dec 19, 2018
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.ingest.geoip;

import com.maxmind.geoip2.DatabaseReader;
Expand All @@ -27,6 +28,11 @@

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Objects;

/**
* Facilitates lazy loading of the database reader, so that when the geoip plugin is installed, but not used,
Expand All @@ -36,21 +42,83 @@ final class DatabaseReaderLazyLoader implements Closeable {

private static final Logger LOGGER = LogManager.getLogger(DatabaseReaderLazyLoader.class);

private final String databaseFileName;
private final Path databasePath;
private final CheckedSupplier<DatabaseReader, IOException> loader;
// package protected for testing only:
final SetOnce<DatabaseReader> databaseReader;

DatabaseReaderLazyLoader(String databaseFileName, CheckedSupplier<DatabaseReader, IOException> loader) {
this.databaseFileName = databaseFileName;
this.loader = loader;
DatabaseReaderLazyLoader(final Path databasePath, final CheckedSupplier<DatabaseReader, IOException> loader) {
this.databasePath = Objects.requireNonNull(databasePath);
this.loader = Objects.requireNonNull(loader);
this.databaseReader = new SetOnce<>();
}

synchronized DatabaseReader get() throws IOException {
/**
* Read the database type from the database. We do this manually instead of relying on the built-in mechanism to avoid reading the
* entire database into memory merely to read the type. This is especially important to maintain on master nodes where pipelines are
* validated. If we read the entire database into memory, we could potentially run into low-memory constraints on such nodes where
* loading this data would otherwise be wasteful if they are not also ingest nodes.
*
* @return the database type
* @throws IOException if an I/O exception occurs reading the database type
*/
final String getDatabaseType() throws IOException {
final long fileSize = Files.size(databasePath);
if (fileSize <= 512) {
throw new IOException("unexpected file length [" + fileSize + "] for [" + databasePath + "]");
}
final int[] databaseTypeMarker = {'d', 'a', 't', 'a', 'b', 'a', 's', 'e', '_', 't', 'y', 'p', 'e'};
try (InputStream in = Files.newInputStream(databasePath)) {
// read the last 512 bytes
final long skipped = in.skip(fileSize - 512);
if (skipped != fileSize - 512) {
throw new IOException("failed to skip [" + (fileSize - 512) + "] bytes while reading [" + databasePath + "]");
}
final byte[] tail = new byte[512];
int read = 0;
do {
final int actualBytesRead = in.read(tail, read, 512 - read);
if (actualBytesRead == -1) {
throw new IOException("unexpected end of stream [" + databasePath + "] after reading [" + read + "] bytes");
}
read += actualBytesRead;
} while (read != 512);

// find the database_type header
int metadataOffset = -1;
int markerOffset = 0;
for (int i = 0; i < tail.length; i++) {
byte b = tail[i];

if (b == databaseTypeMarker[markerOffset]) {
markerOffset++;
} else {
markerOffset = 0;
}
if (markerOffset == databaseTypeMarker.length) {
metadataOffset = i + 1;
break;
}
}

// read the database type
final int offsetByte = tail[metadataOffset] & 0xFF;
final int type = offsetByte >>> 5;
if (type != 2) {
throw new IOException("type must be UTF-8 string");
}
int size = offsetByte & 0x1f;
return new String(tail, metadataOffset + 1, size, StandardCharsets.UTF_8);
}
}

DatabaseReader get() throws IOException {
if (databaseReader.get() == null) {
databaseReader.set(loader.get());
LOGGER.debug("Loaded [{}] geoip database", databaseFileName);
synchronized (databaseReader) {
if (databaseReader.get() == null) {
databaseReader.set(loader.get());
LOGGER.debug("loaded [{}] geo-IP database", databasePath);
}
}
}
return databaseReader.get();
}
Expand All @@ -59,4 +127,5 @@ synchronized DatabaseReader get() throws IOException {
public synchronized void close() throws IOException {
IOUtils.close(databaseReader.get());
}

}
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.geoip.IngestGeoIpPlugin.GeoIpCache;

import java.io.IOException;
import java.net.InetAddress;
import java.security.AccessController;
import java.security.PrivilegedAction;
Expand All @@ -64,19 +65,35 @@ public final class GeoIpProcessor extends AbstractProcessor {

private final String field;
private final String targetField;
private final DatabaseReader dbReader;
private final DatabaseReaderLazyLoader lazyLoader;
private final Set<Property> properties;
private final boolean ignoreMissing;
private final GeoIpCache cache;


GeoIpProcessor(String tag, String field, DatabaseReader dbReader, String targetField, Set<Property> properties, boolean ignoreMissing,
GeoIpCache cache) {
/**
* Construct a geo-IP processor.
*
* @param tag the processor tag
* @param field the source field to geo-IP map
* @param lazyLoader a supplier of a geo-IP database reader; ideally this is lazily-loaded once on first use
* @param targetField the target field
* @param properties the properties; ideally this is lazily-loaded once on first use
* @param ignoreMissing true if documents with a missing value for the field should be ignored
* @param cache a geo-IP cache
*/
GeoIpProcessor(
final String tag,
final String field,
final DatabaseReaderLazyLoader lazyLoader,
final String targetField,
final Set<Property> properties,
final boolean ignoreMissing,
final GeoIpCache cache) {
super(tag);
this.field = field;
this.targetField = targetField;
this.dbReader = dbReader;
this.properties = properties;
this.lazyLoader = lazyLoader;
this.properties = Collections.unmodifiableSet(properties);
this.ignoreMissing = ignoreMissing;
this.cache = cache;
}
Expand All @@ -86,7 +103,7 @@ boolean isIgnoreMissing() {
}

@Override
public IngestDocument execute(IngestDocument ingestDocument) {
public IngestDocument execute(IngestDocument ingestDocument) throws IOException {
String ip = ingestDocument.getFieldValue(field, String.class, ignoreMissing);

if (ip == null && ignoreMissing) {
Expand All @@ -98,7 +115,7 @@ public IngestDocument execute(IngestDocument ingestDocument) {
final InetAddress ipAddress = InetAddresses.forString(ip);

Map<String, Object> geoData;
String databaseType = dbReader.getMetadata().getDatabaseType();
String databaseType = lazyLoader.get().getMetadata().getDatabaseType();

if (databaseType.endsWith(CITY_DB_SUFFIX)) {
try {
Expand All @@ -119,7 +136,7 @@ public IngestDocument execute(IngestDocument ingestDocument) {
geoData = Collections.emptyMap();
}
} else {
throw new ElasticsearchParseException("Unsupported database type [" + dbReader.getMetadata().getDatabaseType()
throw new ElasticsearchParseException("Unsupported database type [" + lazyLoader.get().getMetadata().getDatabaseType()
+ "]", new IllegalStateException());
}
if (geoData.isEmpty() == false) {
Expand All @@ -141,8 +158,8 @@ String getTargetField() {
return targetField;
}

DatabaseReader getDbReader() {
return dbReader;
DatabaseReader getDatabaseReader() throws IOException {
return lazyLoader.get();
}

Set<Property> getProperties() {
Expand All @@ -154,7 +171,7 @@ private Map<String, Object> retrieveCityGeoData(InetAddress ipAddress) {
CityResponse response = AccessController.doPrivileged((PrivilegedAction<CityResponse>) () ->
cache.putIfAbsent(ipAddress, CityResponse.class, ip -> {
try {
return dbReader.city(ip);
return lazyLoader.get().city(ip);
} catch (AddressNotFoundException e) {
throw new AddressNotFoundRuntimeException(e);
} catch (Exception e) {
Expand Down Expand Up @@ -240,7 +257,7 @@ private Map<String, Object> retrieveCountryGeoData(InetAddress ipAddress) {
CountryResponse response = AccessController.doPrivileged((PrivilegedAction<CountryResponse>) () ->
cache.putIfAbsent(ipAddress, CountryResponse.class, ip -> {
try {
return dbReader.country(ip);
return lazyLoader.get().country(ip);
} catch (AddressNotFoundException e) {
throw new AddressNotFoundRuntimeException(e);
} catch (Exception e) {
Expand Down Expand Up @@ -285,7 +302,7 @@ private Map<String, Object> retrieveAsnGeoData(InetAddress ipAddress) {
AsnResponse response = AccessController.doPrivileged((PrivilegedAction<AsnResponse>) () ->
cache.putIfAbsent(ipAddress, AsnResponse.class, ip -> {
try {
return dbReader.asn(ip);
return lazyLoader.get().asn(ip);
} catch (AddressNotFoundException e) {
throw new AddressNotFoundRuntimeException(e);
} catch (Exception e) {
Expand Down Expand Up @@ -330,6 +347,11 @@ public static final class Factory implements Processor.Factory {
);

private final Map<String, DatabaseReaderLazyLoader> databaseReaders;

Map<String, DatabaseReaderLazyLoader> databaseReaders() {
return Collections.unmodifiableMap(databaseReaders);
}

private final GeoIpCache cache;

public Factory(Map<String, DatabaseReaderLazyLoader> databaseReaders, GeoIpCache cache) {
Expand All @@ -338,8 +360,10 @@ public Factory(Map<String, DatabaseReaderLazyLoader> databaseReaders, GeoIpCache
}

@Override
public GeoIpProcessor create(Map<String, Processor.Factory> registry, String processorTag,
Map<String, Object> config) throws Exception {
public GeoIpProcessor create(
final Map<String, Processor.Factory> registry,
final String processorTag,
final Map<String, Object> config) throws IOException {
String ipField = readStringProperty(TYPE, processorTag, config, "field");
String targetField = readStringProperty(TYPE, processorTag, config, "target_field", "geoip");
String databaseFile = readStringProperty(TYPE, processorTag, config, "database_file", "GeoLite2-City.mmdb");
Expand All @@ -352,8 +376,7 @@ public GeoIpProcessor create(Map<String, Processor.Factory> registry, String pro
"database_file", "database file [" + databaseFile + "] doesn't exist");
}

DatabaseReader databaseReader = lazyLoader.get();
String databaseType = databaseReader.getMetadata().getDatabaseType();
final String databaseType = lazyLoader.getDatabaseType();

final Set<Property> properties;
if (propertyNames != null) {
Expand All @@ -378,7 +401,7 @@ public GeoIpProcessor create(Map<String, Processor.Factory> registry, String pro
}
}

return new GeoIpProcessor(processorTag, ipField, databaseReader, targetField, properties, ignoreMissing, cache);
return new GeoIpProcessor(processorTag, ipField, lazyLoader, targetField, properties, ignoreMissing, cache);
}
}

Expand Down
Expand Up @@ -90,16 +90,17 @@ static Map<String, DatabaseReaderLazyLoader> loadDatabaseReaders(Path geoIpConfi
Path databasePath = iterator.next();
if (Files.isRegularFile(databasePath) && pathMatcher.matches(databasePath)) {
String databaseFileName = databasePath.getFileName().toString();
DatabaseReaderLazyLoader holder = new DatabaseReaderLazyLoader(databaseFileName,
() -> {
DatabaseReader.Builder builder = createDatabaseBuilder(databasePath).withCache(NoCache.getInstance());
if (loadDatabaseOnHeap) {
builder.fileMode(Reader.FileMode.MEMORY);
} else {
builder.fileMode(Reader.FileMode.MEMORY_MAPPED);
}
return builder.build();
});
DatabaseReaderLazyLoader holder = new DatabaseReaderLazyLoader(
databasePath,
() -> {
DatabaseReader.Builder builder = createDatabaseBuilder(databasePath).withCache(NoCache.getInstance());
if (loadDatabaseOnHeap) {
builder.fileMode(Reader.FileMode.MEMORY);
} else {
builder.fileMode(Reader.FileMode.MEMORY_MAPPED);
}
return builder.build();
});
databaseReaders.put(databaseFileName, holder);
}
}
Expand Down