Skip to content

Commit

Permalink
@RocksDB: Change how table size is computed (#3680) (#3689)
Browse files Browse the repository at this point in the history
Retrieve the table size reading "rocksdb.estimate-num-keys" property
from RocksDB. Although still approximate, this approach is more
accurate as it relies on compaction to eliminate duplicate counting.
  • Loading branch information
vjeko committed Jun 29, 2023
1 parent 5845f0c commit da7b468
Showing 1 changed file with 7 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

Expand All @@ -41,6 +40,7 @@
@Slf4j
public class PersistedStreamingMap<K, V> implements ContextAwareMap<K, V> {

public static final String ESTIMATE_SIZE = "rocksdb.estimate-num-keys";
public static final String DISK_BACKED = "diskBacked";
public static final String TRUE = "true";
public static final int BOUND = 100;
Expand Down Expand Up @@ -86,7 +86,6 @@ public static Options getPersistedStreamingMapOptions() {
}

private final ContextAwareMap<K, V> optimisticMap = new StreamingMapDecorator<>();
private final AtomicInteger dataSetSize = new AtomicInteger();
private final CorfuRuntime corfuRuntime;
private final ISerializer serializer;
private final RocksDB rocksDb;
Expand Down Expand Up @@ -116,15 +115,19 @@ public PersistedStreamingMap(@NonNull Path dataPath,
*/
@Override
public int size() {
return dataSetSize.get();
try {
return Math.toIntExact(rocksDb.getLongProperty(ESTIMATE_SIZE));
} catch (RocksDBException e) {
throw new UnrecoverableCorfuError(e);
}
}

/**
* {@inheritDoc}
*/
@Override
public boolean isEmpty() {
return dataSetSize.get() == 0;
return size() == 0;
}

/**
Expand Down Expand Up @@ -192,14 +195,6 @@ public V put(@NonNull K key, @NonNull V value) {
serializer.serialize(key, keyPayload);
serializer.serialize(value, valuePayload);

// Only increment the count if the value is not present. In other words,
// increment the count if this is an update operation.
final boolean keyExists = rocksDb.keyMayExist(keyPayload.array(),
keyPayload.arrayOffset(), keyPayload.readableBytes(), null);
if (!keyExists) {
dataSetSize.incrementAndGet();
}

try {
rocksDb.put(writeOptions,
keyPayload.array(), keyPayload.arrayOffset(), keyPayload.readableBytes(),
Expand Down Expand Up @@ -227,7 +222,6 @@ public V remove(@NonNull Object key) {
if (value != null) {
rocksDb.delete(writeOptions,
keyPayload.array(), keyPayload.arrayOffset(), keyPayload.readableBytes());
dataSetSize.decrementAndGet();
return value;
} else {
return null;
Expand All @@ -253,7 +247,6 @@ public void putAll(@NonNull Map<? extends K, ? extends V> map) {
@Override
public void clear() {
entryStream().map(Entry::getKey).forEach(this::remove);
dataSetSize.set(0);
}

/**
Expand Down

0 comments on commit da7b468

Please sign in to comment.