diff --git a/bindings/c/fdb_c.cpp b/bindings/c/fdb_c.cpp index dfa6f1d6bb6..b89c43af470 100644 --- a/bindings/c/fdb_c.cpp +++ b/bindings/c/fdb_c.cpp @@ -627,6 +627,13 @@ fdb_error_t fdb_transaction_add_conflict_range( FDBTransaction*tr, uint8_t const } +extern "C" DLLEXPORT +FDBFuture* fdb_transaction_get_estimated_range_size_bytes( FDBTransaction* tr, uint8_t const* begin_key_name, + int begin_key_name_length, uint8_t const* end_key_name, int end_key_name_length ) { + KeyRangeRef range(KeyRef(begin_key_name, begin_key_name_length), KeyRef(end_key_name, end_key_name_length)); + return (FDBFuture*)(TXN(tr)->getEstimatedRangeSizeBytes(range).extractPtr()); +} + #include "fdb_c_function_pointers.g.h" #define FDB_API_CHANGED(func, ver) if (header_version < ver) fdb_api_ptr_##func = (void*)&(func##_v##ver##_PREV); else if (fdb_api_ptr_##func == (void*)&fdb_api_ptr_unimpl) fdb_api_ptr_##func = (void*)&(func##_impl); diff --git a/bindings/c/foundationdb/fdb_c.h b/bindings/c/foundationdb/fdb_c.h index 22fee464e59..b5dfa63d13e 100644 --- a/bindings/c/foundationdb/fdb_c.h +++ b/bindings/c/foundationdb/fdb_c.h @@ -256,6 +256,10 @@ extern "C" { int end_key_name_length, FDBConflictRangeType type); + DLLEXPORT WARN_UNUSED_RESULT FDBFuture* + fdb_transaction_get_estimated_range_size_bytes( FDBTransaction* tr, uint8_t const* begin_key_name, + int begin_key_name_length, uint8_t const* end_key_name, int end_key_name_length); + #define FDB_KEYSEL_LAST_LESS_THAN(k, l) k, l, 0, 0 #define FDB_KEYSEL_LAST_LESS_OR_EQUAL(k, l) k, l, 1, 0 #define FDB_KEYSEL_FIRST_GREATER_THAN(k, l) k, l, 1, 1 diff --git a/bindings/flow/fdb_flow.actor.cpp b/bindings/flow/fdb_flow.actor.cpp index 742a60cd7e7..132b1bd4d91 100644 --- a/bindings/flow/fdb_flow.actor.cpp +++ b/bindings/flow/fdb_flow.actor.cpp @@ -131,6 +131,8 @@ namespace FDB { GetRangeLimits limits = GetRangeLimits(), bool snapshot = false, bool reverse = false, FDBStreamingMode streamingMode = FDB_STREAMING_MODE_SERIAL) override; + + Future getEstimatedRangeSizeBytes(const KeyRange& keys) override; void addReadConflictRange(KeyRangeRef const& keys) override; void addReadConflictKey(KeyRef const& key) override; @@ -345,6 +347,14 @@ namespace FDB { } ); } + Future TransactionImpl::getEstimatedRangeSizeBytes(const KeyRange& keys) { + return backToFuture(fdb_transaction_get_estimated_range_size_bytes(tr, keys.begin.begin(), keys.begin.size(), keys.end.begin(), keys.end.size()), [](Reference f) { + int64_t bytes; + throw_on_error(fdb_future_get_int64(f->f, &bytes)); + return bytes; + }); + } + void TransactionImpl::addReadConflictRange(KeyRangeRef const& keys) { throw_on_error( fdb_transaction_add_conflict_range( tr, keys.begin.begin(), keys.begin.size(), keys.end.begin(), keys.end.size(), FDB_CONFLICT_RANGE_TYPE_READ ) ); } diff --git a/bindings/flow/fdb_flow.h b/bindings/flow/fdb_flow.h index 2e34f20d594..66049cae0ce 100644 --- a/bindings/flow/fdb_flow.h +++ b/bindings/flow/fdb_flow.h @@ -89,6 +89,8 @@ namespace FDB { streamingMode); } + virtual Future getEstimatedRangeSizeBytes(const KeyRange& keys) = 0; + virtual void addReadConflictRange(KeyRangeRef const& keys) = 0; virtual void addReadConflictKey(KeyRef const& key) = 0; diff --git a/bindings/go/src/fdb/snapshot.go b/bindings/go/src/fdb/snapshot.go index 18c77d79bb3..ca21818729f 100644 --- a/bindings/go/src/fdb/snapshot.go +++ b/bindings/go/src/fdb/snapshot.go @@ -86,3 +86,11 @@ func (s Snapshot) GetReadVersion() FutureInt64 { func (s Snapshot) GetDatabase() Database { return s.transaction.db } + +func (s Snapshot) GetEstimatedRangeSizeBytes(r ExactRange) FutureInt64 { + beginKey, endKey := r.FDBRangeKeys() + return s.getEstimatedRangeSizeBytes( + beginKey.FDBKey(), + endKey.FDBKey(), + ) +} diff --git a/bindings/go/src/fdb/transaction.go b/bindings/go/src/fdb/transaction.go index ff6679c30b9..069564bc7f1 100644 --- a/bindings/go/src/fdb/transaction.go +++ b/bindings/go/src/fdb/transaction.go @@ -39,6 +39,7 @@ type ReadTransaction interface { GetReadVersion() FutureInt64 GetDatabase() Database Snapshot() Snapshot + GetEstimatedRangeSizeBytes(r ExactRange) FutureInt64 ReadTransactor } @@ -305,6 +306,28 @@ func (t Transaction) GetRange(r Range, options RangeOptions) RangeResult { return t.getRange(r, options, false) } +func (t *transaction) getEstimatedRangeSizeBytes(beginKey Key, endKey Key) FutureInt64 { + return &futureInt64{ + future: newFuture(C.fdb_transaction_get_estimated_range_size_bytes( + t.ptr, + byteSliceToPtr(beginKey), + C.int(len(beginKey)), + byteSliceToPtr(endKey), + C.int(len(endKey)), + )), + } +} + +// GetEstimatedRangeSizeBytes will get the byte size of the key range based on the +// byte sample collected by FDB +func (t Transaction) GetEstimatedRangeSizeBytes(r ExactRange) FutureInt64 { + beginKey, endKey := r.FDBRangeKeys() + return t.getEstimatedRangeSizeBytes( + beginKey.FDBKey(), + endKey.FDBKey(), + ) +} + func (t *transaction) getReadVersion() FutureInt64 { return &futureInt64{ future: newFuture(C.fdb_transaction_get_read_version(t.ptr)), diff --git a/bindings/java/fdbJNI.cpp b/bindings/java/fdbJNI.cpp index 121a32ae258..232e8392fee 100644 --- a/bindings/java/fdbJNI.cpp +++ b/bindings/java/fdbJNI.cpp @@ -646,6 +646,35 @@ JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBTransaction_Transaction_1 return (jlong)f; } +JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBTransaction_Transaction_1getEstimatedRangeSizeBytes(JNIEnv *jenv, jobject, jlong tPtr, + jbyteArray beginKeyBytes, jbyteArray endKeyBytes) { + if( !tPtr || !beginKeyBytes || !endKeyBytes) { + throwParamNotNull(jenv); + return 0; + } + FDBTransaction *tr = (FDBTransaction *)tPtr; + + uint8_t *startKey = (uint8_t *)jenv->GetByteArrayElements( beginKeyBytes, JNI_NULL ); + if(!startKey) { + if( !jenv->ExceptionOccurred() ) + throwRuntimeEx( jenv, "Error getting handle to native resources" ); + return 0; + } + + uint8_t *endKey = (uint8_t *)jenv->GetByteArrayElements(endKeyBytes, JNI_NULL); + if (!endKey) { + jenv->ReleaseByteArrayElements( beginKeyBytes, (jbyte *)startKey, JNI_ABORT ); + if( !jenv->ExceptionOccurred() ) + throwRuntimeEx( jenv, "Error getting handle to native resources" ); + return 0; + } + + FDBFuture *f = fdb_transaction_get_estimated_range_size_bytes( tr, startKey, jenv->GetArrayLength( beginKeyBytes ), endKey, jenv->GetArrayLength( endKeyBytes ) ); + jenv->ReleaseByteArrayElements( beginKeyBytes, (jbyte *)startKey, JNI_ABORT ); + jenv->ReleaseByteArrayElements( endKeyBytes, (jbyte *)endKey, JNI_ABORT ); + return (jlong)f; +} + JNIEXPORT void JNICALL Java_com_apple_foundationdb_FDBTransaction_Transaction_1set(JNIEnv *jenv, jobject, jlong tPtr, jbyteArray keyBytes, jbyteArray valueBytes) { if( !tPtr || !keyBytes || !valueBytes ) { throwParamNotNull(jenv); diff --git a/bindings/java/src/main/com/apple/foundationdb/FDBTransaction.java b/bindings/java/src/main/com/apple/foundationdb/FDBTransaction.java index d6f1e4f9354..09be8a353ac 100644 --- a/bindings/java/src/main/com/apple/foundationdb/FDBTransaction.java +++ b/bindings/java/src/main/com/apple/foundationdb/FDBTransaction.java @@ -70,6 +70,16 @@ public CompletableFuture getKey(KeySelector selector) { return getKey_internal(selector, true); } + @Override + public CompletableFuture getEstimatedRangeSizeBytes(byte[] begin, byte[] end) { + return FDBTransaction.this.getEstimatedRangeSizeBytes(begin, end); + } + + @Override + public CompletableFuture getEstimatedRangeSizeBytes(Range range) { + return FDBTransaction.this.getEstimatedRangeSizeBytes(range); + } + /////////////////// // getRange -> KeySelectors /////////////////// @@ -257,6 +267,21 @@ private CompletableFuture getKey_internal(KeySelector selector, boolean } } + @Override + public CompletableFuture getEstimatedRangeSizeBytes(byte[] begin, byte[] end) { + pointerReadLock.lock(); + try { + return new FutureInt64(Transaction_getEstimatedRangeSizeBytes(getPtr(), begin, end), executor); + } finally { + pointerReadLock.unlock(); + } + } + + @Override + public CompletableFuture getEstimatedRangeSizeBytes(Range range) { + return this.getEstimatedRangeSizeBytes(range.begin, range.end); + } + /////////////////// // getRange -> KeySelectors /////////////////// @@ -659,4 +684,5 @@ private native void Transaction_addConflictRange(long cPtr, private native long Transaction_watch(long ptr, byte[] key) throws FDBException; private native void Transaction_cancel(long cPtr); private native long Transaction_getKeyLocations(long cPtr, byte[] key); + private native long Transaction_getEstimatedRangeSizeBytes(long cPtr, byte[] keyBegin, byte[] keyEnd); } diff --git a/bindings/java/src/main/com/apple/foundationdb/ReadTransaction.java b/bindings/java/src/main/com/apple/foundationdb/ReadTransaction.java index c3480ad7df6..6bdbf5be6ca 100644 --- a/bindings/java/src/main/com/apple/foundationdb/ReadTransaction.java +++ b/bindings/java/src/main/com/apple/foundationdb/ReadTransaction.java @@ -412,6 +412,26 @@ AsyncIterable getRange(Range range, AsyncIterable getRange(Range range, int limit, boolean reverse, StreamingMode mode); + + /** + * Gets an estimate for the number of bytes stored in the given range. + * + * @param begin the beginning of the range (inclusive) + * @param end the end of the range (exclusive) + * + * @return a handle to access the results of the asynchronous call + */ + CompletableFuture getEstimatedRangeSizeBytes(byte[] begin, byte[] end); + + /** + * Gets an estimate for the number of bytes stored in the given range. + * + * @param range the range of the keys + * + * @return a handle to access the results of the asynchronous call + */ + CompletableFuture getEstimatedRangeSizeBytes(Range range); + /** * Returns a set of options that can be set on a {@code Transaction} * diff --git a/bindings/python/fdb/impl.py b/bindings/python/fdb/impl.py index c7d33f0fe95..974fcdce984 100644 --- a/bindings/python/fdb/impl.py +++ b/bindings/python/fdb/impl.py @@ -449,6 +449,17 @@ def __getitem__(self, key): if isinstance(key, slice): return self.get_range(key.start, key.stop, reverse=(key.step == -1)) return self.get(key) + + def get_estimated_range_size_bytes(self, beginKey, endKey): + if beginKey is None: + beginKey = b'' + if endKey is None: + endKey = b'\xff' + return FutureInt64(self.capi.fdb_transaction_get_estimated_range_size_bytes( + self.tpointer, + beginKey, len(beginKey), + endKey, len(endKey) + )) class Transaction(TransactionRead): @@ -1424,6 +1435,9 @@ def init_c_api(): ctypes.c_int, ctypes.c_int] _capi.fdb_transaction_get_range.restype = ctypes.c_void_p + _capi.fdb_transaction_get_estimated_range_size_bytes.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_int, ctypes.c_void_p, ctypes.c_int] + _capi.fdb_transaction_get_estimated_range_size_bytes.restype = ctypes.c_void_p + _capi.fdb_transaction_add_conflict_range.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_int, ctypes.c_void_p, ctypes.c_int, ctypes.c_int] _capi.fdb_transaction_add_conflict_range.restype = ctypes.c_int _capi.fdb_transaction_add_conflict_range.errcheck = check_error_code diff --git a/bindings/ruby/lib/fdbimpl.rb b/bindings/ruby/lib/fdbimpl.rb index b1deb1123cf..e5e52661970 100644 --- a/bindings/ruby/lib/fdbimpl.rb +++ b/bindings/ruby/lib/fdbimpl.rb @@ -108,6 +108,7 @@ def self.init_c_api attach_function :fdb_transaction_get, [ :pointer, :pointer, :int, :int ], :pointer attach_function :fdb_transaction_get_key, [ :pointer, :pointer, :int, :int, :int, :int ], :pointer attach_function :fdb_transaction_get_range, [ :pointer, :pointer, :int, :int, :int, :pointer, :int, :int, :int, :int, :int, :int, :int, :int, :int ], :pointer + attach_function :fdb_transaction_get_estimated_range_size_bytes, [ :pointer, :pointer, :int, :pointer, :int ], :pointer attach_function :fdb_transaction_set, [ :pointer, :pointer, :int, :pointer, :int ], :void attach_function :fdb_transaction_clear, [ :pointer, :pointer, :int ], :void attach_function :fdb_transaction_clear_range, [ :pointer, :pointer, :int, :pointer, :int ], :void @@ -817,6 +818,13 @@ def get_range_start_with(prefix, options={}, &block) prefix = prefix.dup.force_encoding "BINARY" get_range(prefix, FDB.strinc(prefix), options, &block) end + + def get_estimated_range_size_bytes(beginKey, endKey) + bkey = FDB.key_to_bytes(beginKey) + ekey = FDB.key_to_bytes(endKey) + Int64Future.new(FDBC.fdb_transaction_get_estimated_range_size_bytes(@tpointer, bkey, bkey.bytesize, ekey, ekey.bytesize)) + end + end TransactionRead.class_variable_set("@@StreamingMode", @@StreamingMode) diff --git a/fdbclient/IClientApi.h b/fdbclient/IClientApi.h index b3e6217054b..154ac9723f6 100644 --- a/fdbclient/IClientApi.h +++ b/fdbclient/IClientApi.h @@ -48,6 +48,7 @@ class ITransaction { virtual ThreadFuture> getVersionstamp() = 0; virtual void addReadConflictRange(const KeyRangeRef& keys) = 0; + virtual ThreadFuture getEstimatedRangeSizeBytes(const KeyRangeRef& keys) = 0; virtual void atomicOp(const KeyRef& key, const ValueRef& value, uint32_t operationType) = 0; virtual void set(const KeyRef& key, const ValueRef& value) = 0; diff --git a/fdbclient/MultiVersionTransaction.actor.cpp b/fdbclient/MultiVersionTransaction.actor.cpp index 767805d7856..40c8616d12c 100644 --- a/fdbclient/MultiVersionTransaction.actor.cpp +++ b/fdbclient/MultiVersionTransaction.actor.cpp @@ -145,6 +145,20 @@ ThreadFuture> DLTransaction::getVersionstamp() { }); } +ThreadFuture DLTransaction::getEstimatedRangeSizeBytes(const KeyRangeRef& keys) { + if (!api->transactionGetEstimatedRangeSizeBytes) { + return unsupported_operation(); + } + FdbCApi::FDBFuture *f = api->transactionGetEstimatedRangeSizeBytes(tr, keys.begin.begin(), keys.begin.size(), keys.end.begin(), keys.end.size()); + + return toThreadFuture(api, f, [](FdbCApi::FDBFuture *f, FdbCApi *api) { + int64_t sampledSize; + FdbCApi::fdb_error_t error = api->futureGetInt64(f, &sampledSize); + ASSERT(!error); + return sampledSize; + }); +} + void DLTransaction::addReadConflictRange(const KeyRangeRef& keys) { throwIfError(api->transactionAddConflictRange(tr, keys.begin.begin(), keys.begin.size(), keys.end.begin(), keys.end.size(), FDBConflictRangeTypes::READ)); } @@ -307,6 +321,7 @@ void DLApi::init() { loadClientFunction(&api->transactionReset, lib, fdbCPath, "fdb_transaction_reset"); loadClientFunction(&api->transactionCancel, lib, fdbCPath, "fdb_transaction_cancel"); loadClientFunction(&api->transactionAddConflictRange, lib, fdbCPath, "fdb_transaction_add_conflict_range"); + loadClientFunction(&api->transactionGetEstimatedRangeSizeBytes, lib, fdbCPath, "fdb_transaction_get_estimated_range_size_bytes", headerVersion >= 700); loadClientFunction(&api->futureGetInt64, lib, fdbCPath, headerVersion >= 620 ? "fdb_future_get_int64" : "fdb_future_get_version"); loadClientFunction(&api->futureGetError, lib, fdbCPath, "fdb_future_get_error"); @@ -547,6 +562,12 @@ void MultiVersionTransaction::addReadConflictRange(const KeyRangeRef& keys) { } } +ThreadFuture MultiVersionTransaction::getEstimatedRangeSizeBytes(const KeyRangeRef& keys) { + auto tr = getTransaction(); + auto f = tr.transaction ? tr.transaction->getEstimatedRangeSizeBytes(keys) : ThreadFuture(Never()); + return abortableFuture(f, tr.onChange); +} + void MultiVersionTransaction::atomicOp(const KeyRef& key, const ValueRef& value, uint32_t operationType) { auto tr = getTransaction(); if(tr.transaction) { diff --git a/fdbclient/MultiVersionTransaction.h b/fdbclient/MultiVersionTransaction.h index e71f6e895f3..a657f49cbb3 100644 --- a/fdbclient/MultiVersionTransaction.h +++ b/fdbclient/MultiVersionTransaction.h @@ -81,6 +81,9 @@ struct FdbCApi : public ThreadSafeReferenceCounted { void (*transactionClear)(FDBTransaction *tr, uint8_t const *keyName, int keyNameLength); void (*transactionClearRange)(FDBTransaction *tr, uint8_t const *beginKeyName, int beginKeyNameLength, uint8_t const *endKeyName, int endKeyNameLength); void (*transactionAtomicOp)(FDBTransaction *tr, uint8_t const *keyName, int keyNameLength, uint8_t const *param, int paramLength, FDBMutationTypes::Option operationType); + + FDBFuture* (*transactionGetEstimatedRangeSizeBytes)(FDBTransaction* tr, uint8_t const* begin_key_name, + int begin_key_name_length, uint8_t const* end_key_name, int end_key_name_length); FDBFuture* (*transactionCommit)(FDBTransaction *tr); fdb_error_t (*transactionGetCommittedVersion)(FDBTransaction *tr, int64_t *outVersion); @@ -129,6 +132,7 @@ class DLTransaction : public ITransaction, ThreadSafeReferenceCounted> getRange( const KeyRangeRef& keys, GetRangeLimits limits, bool snapshot=false, bool reverse=false) override; ThreadFuture>> getAddressesForKey(const KeyRef& key) override; ThreadFuture> getVersionstamp() override; + ThreadFuture getEstimatedRangeSizeBytes(const KeyRangeRef& keys) override; void addReadConflictRange(const KeyRangeRef& keys) override; @@ -228,6 +232,7 @@ class MultiVersionTransaction : public ITransaction, ThreadSafeReferenceCounted< ThreadFuture> getVersionstamp() override; void addReadConflictRange(const KeyRangeRef& keys) override; + ThreadFuture getEstimatedRangeSizeBytes(const KeyRangeRef& keys) override; void atomicOp(const KeyRef& key, const ValueRef& value, uint32_t operationType) override; void set(const KeyRef& key, const ValueRef& value) override; diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index fa0ff960a57..e22620d5720 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -3168,6 +3168,46 @@ Future Transaction::onError( Error const& e ) { return e; } +ACTOR Future getStorageMetricsLargeKeyRange(Database cx, KeyRangeRef keys); + +ACTOR Future doGetStorageMetrics(Database cx, KeyRangeRef keys, Reference locationInfo) { + loop { + try { + WaitMetricsRequest req(keys, StorageMetrics(), StorageMetrics()); + req.min.bytes = 0; + req.max.bytes = -1; + StorageMetrics m = wait( + loadBalance(locationInfo, &StorageServerInterface::waitMetrics, req, TaskPriority::DataDistribution)); + return m; + } catch (Error& e) { + if (e.code() != error_code_wrong_shard_server && e.code() != error_code_all_alternatives_failed) { + TraceEvent(SevError, "WaitStorageMetricsError").error(e); + throw; + } + wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, TaskPriority::DataDistribution)); + cx->invalidateCache(keys); + StorageMetrics m = wait(getStorageMetricsLargeKeyRange(cx, keys)); + return m; + } + } +} + +ACTOR Future getStorageMetricsLargeKeyRange(Database cx, KeyRangeRef keys) { + + vector>> locations = wait(getKeyRangeLocations( + cx, keys, std::numeric_limits::max(), false, &StorageServerInterface::waitMetrics, TransactionInfo(TaskPriority::DataDistribution))); + state int nLocs = locations.size(); + state vector> fx(nLocs); + state StorageMetrics total; + for (int i = 0; i < nLocs; i++) { + fx[i] = doGetStorageMetrics(cx, locations[i].first, locations[i].second); + } + wait(waitForAll(fx)); + for (int i = 0; i < nLocs; i++) { + total += fx[i].get(); + } + return total; +} ACTOR Future trackBoundedStorageMetrics( KeyRange keys, @@ -3189,14 +3229,11 @@ ACTOR Future trackBoundedStorageMetrics( } } -ACTOR Future< StorageMetrics > waitStorageMetricsMultipleLocations( - vector< pair> > locations, - StorageMetrics min, - StorageMetrics max, - StorageMetrics permittedError) -{ +ACTOR Future waitStorageMetricsMultipleLocations( + vector>> locations, StorageMetrics min, StorageMetrics max, + StorageMetrics permittedError) { state int nLocs = locations.size(); - state vector> fx( nLocs ); + state vector> fx(nLocs); state StorageMetrics total; state PromiseStream deltas; state vector> wx( fx.size() ); @@ -3204,17 +3241,17 @@ ACTOR Future< StorageMetrics > waitStorageMetricsMultipleLocations( state StorageMetrics maxPlus = max + halfErrorPerMachine * (nLocs-1); state StorageMetrics minMinus = min - halfErrorPerMachine * (nLocs-1); - for(int i=0; i, int> > waitStorageMetrics( try { Future fx; if (locations.size() > 1) { - fx = waitStorageMetricsMultipleLocations( locations, min, max, permittedError ); + fx = waitStorageMetricsMultipleLocations(locations, min, max, permittedError); } else { WaitMetricsRequest req( keys, min, max ); fx = loadBalance( locations[0].second, &StorageServerInterface::waitMetrics, req, TaskPriority::DataDistribution ); @@ -3294,9 +3331,13 @@ Future< std::pair, int> > Transaction::waitStorageMetri } Future< StorageMetrics > Transaction::getStorageMetrics( KeyRange const& keys, int shardLimit ) { - StorageMetrics m; - m.bytes = -1; - return extractMetrics( ::waitStorageMetrics( cx, keys, StorageMetrics(), m, StorageMetrics(), shardLimit, -1 ) ); + if (shardLimit > 0) { + StorageMetrics m; + m.bytes = -1; + return extractMetrics(::waitStorageMetrics(cx, keys, StorageMetrics(), m, StorageMetrics(), shardLimit, -1)); + } else { + return ::getStorageMetricsLargeKeyRange(cx, keys); + } } ACTOR Future< Standalone> > splitStorageMetrics( Database cx, KeyRange keys, StorageMetrics limit, StorageMetrics estimated ) diff --git a/fdbclient/NativeAPI.actor.h b/fdbclient/NativeAPI.actor.h index 0eb470969f2..12eb75fe656 100644 --- a/fdbclient/NativeAPI.actor.h +++ b/fdbclient/NativeAPI.actor.h @@ -244,6 +244,7 @@ class Transaction : NonCopyable { Future< Void > warmRange( Database cx, KeyRange keys ); Future< std::pair, int> > waitStorageMetrics( KeyRange const& keys, StorageMetrics const& min, StorageMetrics const& max, StorageMetrics const& permittedError, int shardLimit, int expectedShardCount ); + // Pass a negative value for `shardLimit` to indicate no limit on the shard number. Future< StorageMetrics > getStorageMetrics( KeyRange const& keys, int shardLimit ); Future< Standalone> > splitStorageMetrics( KeyRange const& keys, StorageMetrics const& limit, StorageMetrics const& estimated ); diff --git a/fdbclient/ReadYourWrites.actor.cpp b/fdbclient/ReadYourWrites.actor.cpp index 611b0ea41b5..81f6f0e1728 100644 --- a/fdbclient/ReadYourWrites.actor.cpp +++ b/fdbclient/ReadYourWrites.actor.cpp @@ -1343,6 +1343,16 @@ Future< Standalone >> ReadYourWritesTransaction::getAddre return result; } +Future ReadYourWritesTransaction::getEstimatedRangeSizeBytes(const KeyRangeRef& keys) { + if(checkUsedDuringCommit()) { + throw used_during_commit(); + } + if( resetPromise.isSet() ) + return resetPromise.getFuture().getError(); + + return map(waitOrError(tr.getStorageMetrics(keys, -1), resetPromise.getFuture()), [](const StorageMetrics& m) { return m.bytes; }); +} + void ReadYourWritesTransaction::addReadConflictRange( KeyRangeRef const& keys ) { if(checkUsedDuringCommit()) { throw used_during_commit(); diff --git a/fdbclient/ReadYourWrites.h b/fdbclient/ReadYourWrites.h index fc766617bfb..f4ebd92e4b5 100644 --- a/fdbclient/ReadYourWrites.h +++ b/fdbclient/ReadYourWrites.h @@ -84,6 +84,7 @@ class ReadYourWritesTransaction : NonCopyable, public ReferenceCounted>> getAddressesForKey(const Key& key); + Future getEstimatedRangeSizeBytes( const KeyRangeRef& keys ); void addReadConflictRange( KeyRangeRef const& keys ); void makeSelfConflicting() { tr.makeSelfConflicting(); } diff --git a/fdbclient/ThreadSafeTransaction.actor.cpp b/fdbclient/ThreadSafeTransaction.actor.cpp index 18de332d902..afafa8747cd 100644 --- a/fdbclient/ThreadSafeTransaction.actor.cpp +++ b/fdbclient/ThreadSafeTransaction.actor.cpp @@ -157,6 +157,17 @@ ThreadFuture< Key > ThreadSafeTransaction::getKey( const KeySelectorRef& key, bo } ); } +ThreadFuture ThreadSafeTransaction::getEstimatedRangeSizeBytes( const KeyRangeRef& keys ) { + KeyRange r = keys; + + ReadYourWritesTransaction *tr = this->tr; + return onMainThread( [tr, r]() -> Future { + tr->checkDeferredError(); + return tr->getEstimatedRangeSizeBytes(r); + } ); +} + + ThreadFuture< Standalone > ThreadSafeTransaction::getRange( const KeySelectorRef& begin, const KeySelectorRef& end, int limit, bool snapshot, bool reverse ) { KeySelector b = begin; KeySelector e = end; diff --git a/fdbclient/ThreadSafeTransaction.h b/fdbclient/ThreadSafeTransaction.h index c5832cec458..61b64aa4b4e 100644 --- a/fdbclient/ThreadSafeTransaction.h +++ b/fdbclient/ThreadSafeTransaction.h @@ -71,6 +71,7 @@ class ThreadSafeTransaction : public ITransaction, ThreadSafeReferenceCounted>> getAddressesForKey(const KeyRef& key) override; ThreadFuture> getVersionstamp() override; + ThreadFuture getEstimatedRangeSizeBytes(const KeyRangeRef& keys) override; void addReadConflictRange( const KeyRangeRef& keys ) override; void makeSelfConflicting();