Skip to content
Permalink
Browse files
use a non-concurrent map for lookups-cached-global unless incremental…
… updates are actually required (#12293)

* use a non-concurrent map for lookups-cached-global unless incremental updates are actually required
* adjustments
* fix test
  • Loading branch information
clintropolis committed Mar 9, 2022
1 parent abe76cc commit 0600772cceec633a7754a68d6b4ff5ed21af8795
Show file tree
Hide file tree
Showing 19 changed files with 412 additions and 451 deletions.
@@ -53,7 +53,6 @@
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -154,8 +153,9 @@ public boolean start()

final String topic = getKafkaTopic();
LOG.debug("About to listen to topic [%s] with group.id [%s]", topic, factoryId);
// this creates a ConcurrentMap
cacheHandler = cacheManager.createCache();
final ConcurrentMap<String, String> map = cacheHandler.getCache();
final Map<String, String> map = cacheHandler.getCache();
mapRef.set(map);


@@ -19,6 +19,7 @@

package org.apache.druid.query.lookup.namespace;

import org.apache.druid.server.lookup.namespace.cache.CacheHandler;
import org.apache.druid.server.lookup.namespace.cache.CacheScheduler;

import javax.annotation.Nullable;
@@ -31,23 +32,21 @@
/**
* If the lookup source, encapsulated by this {@code CacheGenerator}, has data newer than identified
* by the given {@code lastVersion} (which is null at the first run of this method, or the version from the previous
* run), this method creates a new {@code CacheScheduler.VersionedCache} with {@link
* CacheScheduler#createVersionedCache}, called on the given {@code scheduler}, with the version string identifying
* the current version of lookup source, populates the created {@code VersionedCache} and returns it. If the lookup
* source is up-to-date, this methods returns null.
* run), this method populates a {@link CacheHandler} and returns the version string identifying the current version
* of lookup source, If the lookup source is up-to-date, this methods returns null.
*
* @param namespace The ExtractionNamespace for which to generate cache.
* @param id An object uniquely corresponding to the {@link CacheScheduler.Entry}, for which this generateCache()
* method is called. Also it has the same toString() representation, that is useful for logging
* @param lastVersion The version which was last cached
* @param scheduler Should be used only to call {@link CacheScheduler#createVersionedCache}.
* @return the new cache along with the new version, or null if the last version is up-to-date.
* @param cache a cache to populate
* @return the new version, or null if the last version is up-to-date.
*/
@Nullable
CacheScheduler.VersionedCache generateCache(
String generateCache(
T namespace,
CacheScheduler.EntryImpl<T> id,
String lastVersion,
CacheScheduler scheduler
CacheHandler cache
) throws Exception;
}
@@ -28,6 +28,7 @@
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.lookup.namespace.CacheGenerator;
import org.apache.druid.query.lookup.namespace.JdbcExtractionNamespace;
import org.apache.druid.server.lookup.namespace.cache.CacheHandler;
import org.apache.druid.server.lookup.namespace.cache.CacheScheduler;
import org.apache.druid.utils.JvmUtils;
import org.skife.jdbi.v2.DBI;
@@ -38,7 +39,6 @@

import javax.annotation.Nullable;
import java.sql.Timestamp;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

@@ -57,11 +57,11 @@ public final class JdbcCacheGenerator implements CacheGenerator<JdbcExtractionNa

@Override
@Nullable
public CacheScheduler.VersionedCache generateCache(
public String generateCache(
final JdbcExtractionNamespace namespace,
final CacheScheduler.EntryImpl<JdbcExtractionNamespace> entryId,
final String lastVersion,
final CacheScheduler scheduler
final CacheHandler cache
)
{
final long lastCheck = lastVersion == null ? JodaUtils.MIN_INSTANT : Long.parseLong(lastVersion);
@@ -76,10 +76,7 @@ public CacheScheduler.VersionedCache generateCache(
}
catch (UnableToObtainConnectionException e) {
if (e.getMessage().contains(NO_SUITABLE_DRIVER_FOUND_ERROR)) {
throw new ISE(
e,
JDBC_DRIVER_JAR_FILES_MISSING_ERROR
);
throw new ISE(e, JDBC_DRIVER_JAR_FILES_MISSING_ERROR);
} else {
throw e;
}
@@ -94,16 +91,15 @@ public CacheScheduler.VersionedCache generateCache(
} else {
newVersion = StringUtils.format("%d", dbQueryStart);
}
final CacheScheduler.VersionedCache versionedCache = scheduler.createVersionedCache(entryId, newVersion);

final long startNs = System.nanoTime();
try (
Handle handle = getHandle(entryId, namespace);
ResultIterator<Pair<String, String>> pairs = getLookupPairs(handle, namespace)) {
final Map<String, String> cache = versionedCache.getCache();
ResultIterator<Pair<String, String>> pairs = getLookupPairs(handle, namespace)
) {
final MapPopulator.PopulateResult populateResult = MapPopulator.populateAndWarnAtByteLimit(
pairs,
cache,
cache.getCache(),
(long) (MAX_MEMORY * namespace.getMaxHeapPercentage() / 100.0),
null == entryId ? null : entryId.toString()
);
@@ -115,21 +111,18 @@ public CacheScheduler.VersionedCache generateCache(
entryId,
duration
);
return versionedCache;
return newVersion;
}
catch (UnableToObtainConnectionException e) {
if (e.getMessage().contains(NO_SUITABLE_DRIVER_FOUND_ERROR)) {
throw new ISE(
e,
JDBC_DRIVER_JAR_FILES_MISSING_ERROR
);
throw new ISE(e, JDBC_DRIVER_JAR_FILES_MISSING_ERROR);
} else {
throw e;
}
}
catch (Throwable t) {
try {
versionedCache.close();
cache.close();
}
catch (Exception e) {
t.addSuppressed(e);
@@ -21,6 +21,7 @@

import org.apache.druid.query.lookup.namespace.CacheGenerator;
import org.apache.druid.query.lookup.namespace.StaticMapExtractionNamespace;
import org.apache.druid.server.lookup.namespace.cache.CacheHandler;
import org.apache.druid.server.lookup.namespace.cache.CacheScheduler;

import javax.annotation.Nullable;
@@ -32,11 +33,11 @@ public final class StaticMapCacheGenerator implements CacheGenerator<StaticMapEx

@Override
@Nullable
public CacheScheduler.VersionedCache generateCache(
public String generateCache(
final StaticMapExtractionNamespace namespace,
final CacheScheduler.EntryImpl<StaticMapExtractionNamespace> id,
final String lastVersion,
final CacheScheduler scheduler
final CacheHandler cache
)
{
if (lastVersion != null) {
@@ -46,14 +47,13 @@ public CacheScheduler.VersionedCache generateCache(
"StaticMapCacheGenerator could only be configured for a namespace which is scheduled "
+ "to be updated once, not periodically. Last version: `" + lastVersion + "`");
}
CacheScheduler.VersionedCache versionedCache = scheduler.createVersionedCache(id, version);
try {
versionedCache.getCache().putAll(namespace.getMap());
return versionedCache;
cache.getCache().putAll(namespace.getMap());
return version;
}
catch (Throwable t) {
try {
versionedCache.close();
cache.close();
}
catch (Exception e) {
t.addSuppressed(e);
@@ -30,6 +30,7 @@
import org.apache.druid.query.lookup.namespace.CacheGenerator;
import org.apache.druid.query.lookup.namespace.UriExtractionNamespace;
import org.apache.druid.segment.loading.URIDataPuller;
import org.apache.druid.server.lookup.namespace.cache.CacheHandler;
import org.apache.druid.server.lookup.namespace.cache.CacheScheduler;
import org.apache.druid.utils.CompressionUtils;
import org.apache.druid.utils.JvmUtils;
@@ -62,11 +63,11 @@ public UriCacheGenerator(

@Override
@Nullable
public CacheScheduler.VersionedCache generateCache(
public String generateCache(
final UriExtractionNamespace extractionNamespace,
final CacheScheduler.EntryImpl<UriExtractionNamespace> entryId,
@Nullable final String lastVersion,
final CacheScheduler scheduler
final CacheHandler cache
) throws Exception
{
final boolean doSearch = extractionNamespace.getUriPrefix() != null;
@@ -143,14 +144,13 @@ public InputStream openStream() throws IOException
}
};

final CacheScheduler.VersionedCache versionedCache = scheduler.createVersionedCache(entryId, version);
try {
final long startNs = System.nanoTime();
final MapPopulator.PopulateResult populateResult = new MapPopulator<>(
extractionNamespace.getNamespaceParseSpec().getParser()
).populateAndWarnAtByteLimit(
source,
versionedCache.getCache(),
cache.getCache(),
(long) (MAX_MEMORY * extractionNamespace.getMaxHeapPercentage() / 100.0),
null == entryId ? null : entryId.toString()
);
@@ -163,11 +163,11 @@ public InputStream openStream() throws IOException
entryId,
duration
);
return versionedCache;
return version;
}
catch (Throwable t) {
try {
versionedCache.close();
cache.close();
}
catch (Exception e) {
t.addSuppressed(e);
@@ -21,25 +21,26 @@

import org.apache.druid.java.util.common.logger.Logger;

import java.util.concurrent.ConcurrentMap;
import java.io.Closeable;
import java.util.Map;

public final class CacheHandler implements AutoCloseable
public final class CacheHandler implements AutoCloseable, Closeable
{
private static final Logger log = new Logger(CacheHandler.class);

private final NamespaceExtractionCacheManager cacheManager;
private final ConcurrentMap<String, String> cache;
private final Map<String, String> cache;
final Object id;

CacheHandler(NamespaceExtractionCacheManager cacheManager, ConcurrentMap<String, String> cache, Object id)
CacheHandler(NamespaceExtractionCacheManager cacheManager, Map<String, String> cache, Object id)
{
log.debug("Creating %s", super.toString());
this.cacheManager = cacheManager;
this.cache = cache;
this.id = id;
}

public ConcurrentMap<String, String> getCache()
public Map<String, String> getCache()
{
return cache;
}
@@ -48,7 +49,7 @@ public ConcurrentMap<String, String> getCache()
public void close()
{
cacheManager.disposeCache(this);
// Log statement after disposeCache(), because logging may fail (e. g. in shutdown hooks)
// Log statement after disposeCache(), because logging may fail (e.g. in shutdown hooks)
log.debug("Closed %s", super.toString());
}
}

This file was deleted.

0 comments on commit 0600772

Please sign in to comment.