Skip to content

Commit

Permalink
Make the ingest-geoip databases even lazier to load (#36679)
Browse files Browse the repository at this point in the history
Today we try to load the ingest-geoip databases lazily. Currently they
are loaded as soon as any pipeline that uses an ingest-geoip processor
is created. This is not lazy enough. For example, we could only load the
databases the first time that they are actually used. This would ensure
that we load the minimal set of data to support an in-use pipeline
(instead of *all* of the data). This can come up in a couple of
ways. One is when a subset of the database is used (e.g., the city
database versus the country database versus the ASN database). Another
is when the plugins are installed on non-ingest nodes (e.g., master-only
nodes); we would never use the database in this case yet they are
currently being loaded occupying ~60 MB of the heap. This commit makes
the ingest-geoip databases as lazy as possible.

Co-authored-by: Martijn van Groningen <martijn.v.groningen@gmail.com>
  • Loading branch information
jasontedor and martijnvg committed Dec 19, 2018
1 parent aaf466f commit 273b37a
Show file tree
Hide file tree
Showing 6 changed files with 416 additions and 94 deletions.
Expand Up @@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.ingest.geoip;

import com.maxmind.geoip2.DatabaseReader;
Expand All @@ -27,30 +28,120 @@

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Objects;

/**
* Facilitates lazy loading of the database reader, so that when the geoip plugin is installed, but not used,
* no memory is being wasted on the database reader.
*/
final class DatabaseReaderLazyLoader implements Closeable {
class DatabaseReaderLazyLoader implements Closeable {

private static final Logger LOGGER = LogManager.getLogger(DatabaseReaderLazyLoader.class);

private final String databaseFileName;
private final Path databasePath;
private final CheckedSupplier<DatabaseReader, IOException> loader;
// package protected for testing only:
final SetOnce<DatabaseReader> databaseReader;

DatabaseReaderLazyLoader(String databaseFileName, CheckedSupplier<DatabaseReader, IOException> loader) {
this.databaseFileName = databaseFileName;
this.loader = loader;
// cache the database type so that we do not re-read it on every pipeline execution
final SetOnce<String> databaseType;

DatabaseReaderLazyLoader(final Path databasePath, final CheckedSupplier<DatabaseReader, IOException> loader) {
this.databasePath = Objects.requireNonNull(databasePath);
this.loader = Objects.requireNonNull(loader);
this.databaseReader = new SetOnce<>();
this.databaseType = new SetOnce<>();
}

/**
* Read the database type from the database. We do this manually instead of relying on the built-in mechanism to avoid reading the
* entire database into memory merely to read the type. This is especially important to maintain on master nodes where pipelines are
* validated. If we read the entire database into memory, we could potentially run into low-memory constraints on such nodes where
* loading this data would otherwise be wasteful if they are not also ingest nodes.
*
* @return the database type
* @throws IOException if an I/O exception occurs reading the database type
*/
final String getDatabaseType() throws IOException {
if (databaseType.get() == null) {
synchronized (databaseType) {
if (databaseType.get() == null) {
final long fileSize = databaseFileSize();
if (fileSize <= 512) {
throw new IOException("unexpected file length [" + fileSize + "] for [" + databasePath + "]");
}
final int[] databaseTypeMarker = {'d', 'a', 't', 'a', 'b', 'a', 's', 'e', '_', 't', 'y', 'p', 'e'};
try (InputStream in = databaseInputStream()) {
// read the last 512 bytes
final long skipped = in.skip(fileSize - 512);
if (skipped != fileSize - 512) {
throw new IOException("failed to skip [" + (fileSize - 512) + "] bytes while reading [" + databasePath + "]");
}
final byte[] tail = new byte[512];
int read = 0;
do {
final int actualBytesRead = in.read(tail, read, 512 - read);
if (actualBytesRead == -1) {
throw new IOException("unexpected end of stream [" + databasePath + "] after reading [" + read + "] bytes");
}
read += actualBytesRead;
} while (read != 512);

// find the database_type header
int metadataOffset = -1;
int markerOffset = 0;
for (int i = 0; i < tail.length; i++) {
byte b = tail[i];

if (b == databaseTypeMarker[markerOffset]) {
markerOffset++;
} else {
markerOffset = 0;
}
if (markerOffset == databaseTypeMarker.length) {
metadataOffset = i + 1;
break;
}
}

if (metadataOffset == -1) {
throw new IOException("database type marker not found");
}

// read the database type
final int offsetByte = tail[metadataOffset] & 0xFF;
final int type = offsetByte >>> 5;
if (type != 2) {
throw new IOException("type must be UTF-8 string");
}
int size = offsetByte & 0x1f;
databaseType.set(new String(tail, metadataOffset + 1, size, StandardCharsets.UTF_8));
}
}
}
}
return databaseType.get();
}

long databaseFileSize() throws IOException {
return Files.size(databasePath);
}

synchronized DatabaseReader get() throws IOException {
InputStream databaseInputStream() throws IOException {
return Files.newInputStream(databasePath);
}

DatabaseReader get() throws IOException {
if (databaseReader.get() == null) {
databaseReader.set(loader.get());
LOGGER.debug("Loaded [{}] geoip database", databaseFileName);
synchronized (databaseReader) {
if (databaseReader.get() == null) {
databaseReader.set(loader.get());
LOGGER.debug("loaded [{}] geo-IP database", databasePath);
}
}
}
return databaseReader.get();
}
Expand All @@ -59,4 +150,5 @@ synchronized DatabaseReader get() throws IOException {
public synchronized void close() throws IOException {
IOUtils.close(databaseReader.get());
}

}
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.ingest.geoip;

import com.maxmind.geoip2.DatabaseReader;
import com.maxmind.geoip2.exception.AddressNotFoundException;
import com.maxmind.geoip2.model.AsnResponse;
import com.maxmind.geoip2.model.CityResponse;
Expand All @@ -38,6 +37,7 @@
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.geoip.IngestGeoIpPlugin.GeoIpCache;

import java.io.IOException;
import java.net.InetAddress;
import java.security.AccessController;
import java.security.PrivilegedAction;
Expand All @@ -64,18 +64,34 @@ public final class GeoIpProcessor extends AbstractProcessor {

private final String field;
private final String targetField;
private final DatabaseReader dbReader;
private final DatabaseReaderLazyLoader lazyLoader;
private final Set<Property> properties;
private final boolean ignoreMissing;
private final GeoIpCache cache;


GeoIpProcessor(String tag, String field, DatabaseReader dbReader, String targetField, Set<Property> properties, boolean ignoreMissing,
GeoIpCache cache) {
/**
* Construct a geo-IP processor.
*
* @param tag the processor tag
* @param field the source field to geo-IP map
* @param lazyLoader a supplier of a geo-IP database reader; ideally this is lazily-loaded once on first use
* @param targetField the target field
* @param properties the properties; ideally this is lazily-loaded once on first use
* @param ignoreMissing true if documents with a missing value for the field should be ignored
* @param cache a geo-IP cache
*/
GeoIpProcessor(
final String tag,
final String field,
final DatabaseReaderLazyLoader lazyLoader,
final String targetField,
final Set<Property> properties,
final boolean ignoreMissing,
final GeoIpCache cache) {
super(tag);
this.field = field;
this.targetField = targetField;
this.dbReader = dbReader;
this.lazyLoader = lazyLoader;
this.properties = properties;
this.ignoreMissing = ignoreMissing;
this.cache = cache;
Expand All @@ -86,7 +102,7 @@ boolean isIgnoreMissing() {
}

@Override
public IngestDocument execute(IngestDocument ingestDocument) {
public IngestDocument execute(IngestDocument ingestDocument) throws IOException {
String ip = ingestDocument.getFieldValue(field, String.class, ignoreMissing);

if (ip == null && ignoreMissing) {
Expand All @@ -98,7 +114,7 @@ public IngestDocument execute(IngestDocument ingestDocument) {
final InetAddress ipAddress = InetAddresses.forString(ip);

Map<String, Object> geoData;
String databaseType = dbReader.getMetadata().getDatabaseType();
String databaseType = lazyLoader.getDatabaseType();

if (databaseType.endsWith(CITY_DB_SUFFIX)) {
try {
Expand All @@ -119,7 +135,7 @@ public IngestDocument execute(IngestDocument ingestDocument) {
geoData = Collections.emptyMap();
}
} else {
throw new ElasticsearchParseException("Unsupported database type [" + dbReader.getMetadata().getDatabaseType()
throw new ElasticsearchParseException("Unsupported database type [" + lazyLoader.getDatabaseType()
+ "]", new IllegalStateException());
}
if (geoData.isEmpty() == false) {
Expand All @@ -141,8 +157,8 @@ String getTargetField() {
return targetField;
}

DatabaseReader getDbReader() {
return dbReader;
String getDatabaseType() throws IOException {
return lazyLoader.getDatabaseType();
}

Set<Property> getProperties() {
Expand All @@ -154,7 +170,7 @@ private Map<String, Object> retrieveCityGeoData(InetAddress ipAddress) {
CityResponse response = AccessController.doPrivileged((PrivilegedAction<CityResponse>) () ->
cache.putIfAbsent(ipAddress, CityResponse.class, ip -> {
try {
return dbReader.city(ip);
return lazyLoader.get().city(ip);
} catch (AddressNotFoundException e) {
throw new AddressNotFoundRuntimeException(e);
} catch (Exception e) {
Expand Down Expand Up @@ -240,7 +256,7 @@ private Map<String, Object> retrieveCountryGeoData(InetAddress ipAddress) {
CountryResponse response = AccessController.doPrivileged((PrivilegedAction<CountryResponse>) () ->
cache.putIfAbsent(ipAddress, CountryResponse.class, ip -> {
try {
return dbReader.country(ip);
return lazyLoader.get().country(ip);
} catch (AddressNotFoundException e) {
throw new AddressNotFoundRuntimeException(e);
} catch (Exception e) {
Expand Down Expand Up @@ -285,7 +301,7 @@ private Map<String, Object> retrieveAsnGeoData(InetAddress ipAddress) {
AsnResponse response = AccessController.doPrivileged((PrivilegedAction<AsnResponse>) () ->
cache.putIfAbsent(ipAddress, AsnResponse.class, ip -> {
try {
return dbReader.asn(ip);
return lazyLoader.get().asn(ip);
} catch (AddressNotFoundException e) {
throw new AddressNotFoundRuntimeException(e);
} catch (Exception e) {
Expand Down Expand Up @@ -318,18 +334,23 @@ private Map<String, Object> retrieveAsnGeoData(InetAddress ipAddress) {
}

public static final class Factory implements Processor.Factory {
static final Set<Property> DEFAULT_CITY_PROPERTIES = EnumSet.of(
static final Set<Property> DEFAULT_CITY_PROPERTIES = Collections.unmodifiableSet(EnumSet.of(
Property.CONTINENT_NAME, Property.COUNTRY_ISO_CODE, Property.REGION_ISO_CODE,
Property.REGION_NAME, Property.CITY_NAME, Property.LOCATION
);
static final Set<Property> DEFAULT_COUNTRY_PROPERTIES = EnumSet.of(
));
static final Set<Property> DEFAULT_COUNTRY_PROPERTIES = Collections.unmodifiableSet(EnumSet.of(
Property.CONTINENT_NAME, Property.COUNTRY_ISO_CODE
);
static final Set<Property> DEFAULT_ASN_PROPERTIES = EnumSet.of(
));
static final Set<Property> DEFAULT_ASN_PROPERTIES = Collections.unmodifiableSet(EnumSet.of(
Property.IP, Property.ASN, Property.ORGANIZATION_NAME
);
));

private final Map<String, DatabaseReaderLazyLoader> databaseReaders;

Map<String, DatabaseReaderLazyLoader> databaseReaders() {
return Collections.unmodifiableMap(databaseReaders);
}

private final GeoIpCache cache;

public Factory(Map<String, DatabaseReaderLazyLoader> databaseReaders, GeoIpCache cache) {
Expand All @@ -338,8 +359,10 @@ public Factory(Map<String, DatabaseReaderLazyLoader> databaseReaders, GeoIpCache
}

@Override
public GeoIpProcessor create(Map<String, Processor.Factory> registry, String processorTag,
Map<String, Object> config) throws Exception {
public GeoIpProcessor create(
final Map<String, Processor.Factory> registry,
final String processorTag,
final Map<String, Object> config) throws IOException {
String ipField = readStringProperty(TYPE, processorTag, config, "field");
String targetField = readStringProperty(TYPE, processorTag, config, "target_field", "geoip");
String databaseFile = readStringProperty(TYPE, processorTag, config, "database_file", "GeoLite2-City.mmdb");
Expand All @@ -352,19 +375,19 @@ public GeoIpProcessor create(Map<String, Processor.Factory> registry, String pro
"database_file", "database file [" + databaseFile + "] doesn't exist");
}

DatabaseReader databaseReader = lazyLoader.get();
String databaseType = databaseReader.getMetadata().getDatabaseType();
final String databaseType = lazyLoader.getDatabaseType();

final Set<Property> properties;
if (propertyNames != null) {
properties = EnumSet.noneOf(Property.class);
Set<Property> modifiableProperties = EnumSet.noneOf(Property.class);
for (String fieldName : propertyNames) {
try {
properties.add(Property.parseProperty(databaseType, fieldName));
modifiableProperties.add(Property.parseProperty(databaseType, fieldName));
} catch (IllegalArgumentException e) {
throw newConfigurationException(TYPE, processorTag, "properties", e.getMessage());
}
}
properties = Collections.unmodifiableSet(modifiableProperties);
} else {
if (databaseType.endsWith(CITY_DB_SUFFIX)) {
properties = DEFAULT_CITY_PROPERTIES;
Expand All @@ -378,7 +401,7 @@ public GeoIpProcessor create(Map<String, Processor.Factory> registry, String pro
}
}

return new GeoIpProcessor(processorTag, ipField, databaseReader, targetField, properties, ignoreMissing, cache);
return new GeoIpProcessor(processorTag, ipField, lazyLoader, targetField, properties, ignoreMissing, cache);
}
}

Expand Down
Expand Up @@ -90,16 +90,17 @@ static Map<String, DatabaseReaderLazyLoader> loadDatabaseReaders(Path geoIpConfi
Path databasePath = iterator.next();
if (Files.isRegularFile(databasePath) && pathMatcher.matches(databasePath)) {
String databaseFileName = databasePath.getFileName().toString();
DatabaseReaderLazyLoader holder = new DatabaseReaderLazyLoader(databaseFileName,
() -> {
DatabaseReader.Builder builder = createDatabaseBuilder(databasePath).withCache(NoCache.getInstance());
if (loadDatabaseOnHeap) {
builder.fileMode(Reader.FileMode.MEMORY);
} else {
builder.fileMode(Reader.FileMode.MEMORY_MAPPED);
}
return builder.build();
});
DatabaseReaderLazyLoader holder = new DatabaseReaderLazyLoader(
databasePath,
() -> {
DatabaseReader.Builder builder = createDatabaseBuilder(databasePath).withCache(NoCache.getInstance());
if (loadDatabaseOnHeap) {
builder.fileMode(Reader.FileMode.MEMORY);
} else {
builder.fileMode(Reader.FileMode.MEMORY_MAPPED);
}
return builder.build();
});
databaseReaders.put(databaseFileName, holder);
}
}
Expand Down

0 comments on commit 273b37a

Please sign in to comment.