Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added necessary plumbings to expose byte sample collected by storage … #2537

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions bindings/c/fdb_c.cpp
Expand Up @@ -627,6 +627,13 @@ fdb_error_t fdb_transaction_add_conflict_range( FDBTransaction*tr, uint8_t const

}

extern "C" DLLEXPORT
FDBFuture* fdb_transaction_get_estimated_range_size_bytes( FDBTransaction* tr, uint8_t const* begin_key_name,
int begin_key_name_length, uint8_t const* end_key_name, int end_key_name_length ) {
KeyRangeRef range(KeyRef(begin_key_name, begin_key_name_length), KeyRef(end_key_name, end_key_name_length));
return (FDBFuture*)(TXN(tr)->getEstimatedRangeSizeBytes(range).extractPtr());
}

#include "fdb_c_function_pointers.g.h"

#define FDB_API_CHANGED(func, ver) if (header_version < ver) fdb_api_ptr_##func = (void*)&(func##_v##ver##_PREV); else if (fdb_api_ptr_##func == (void*)&fdb_api_ptr_unimpl) fdb_api_ptr_##func = (void*)&(func##_impl);
Expand Down
4 changes: 4 additions & 0 deletions bindings/c/foundationdb/fdb_c.h
Expand Up @@ -256,6 +256,10 @@ extern "C" {
int end_key_name_length,
FDBConflictRangeType type);

DLLEXPORT WARN_UNUSED_RESULT FDBFuture*
fdb_transaction_get_estimated_range_size_bytes( FDBTransaction* tr, uint8_t const* begin_key_name,
int begin_key_name_length, uint8_t const* end_key_name, int end_key_name_length);

#define FDB_KEYSEL_LAST_LESS_THAN(k, l) k, l, 0, 0
#define FDB_KEYSEL_LAST_LESS_OR_EQUAL(k, l) k, l, 1, 0
#define FDB_KEYSEL_FIRST_GREATER_THAN(k, l) k, l, 1, 1
Expand Down
10 changes: 10 additions & 0 deletions bindings/flow/fdb_flow.actor.cpp
Expand Up @@ -131,6 +131,8 @@ namespace FDB {
GetRangeLimits limits = GetRangeLimits(), bool snapshot = false,
bool reverse = false,
FDBStreamingMode streamingMode = FDB_STREAMING_MODE_SERIAL) override;

Future<int64_t> getEstimatedRangeSizeBytes(const KeyRange& keys) override;

void addReadConflictRange(KeyRangeRef const& keys) override;
void addReadConflictKey(KeyRef const& key) override;
Expand Down Expand Up @@ -345,6 +347,14 @@ namespace FDB {
} );
}

Future<int64_t> TransactionImpl::getEstimatedRangeSizeBytes(const KeyRange& keys) {
return backToFuture<int64_t>(fdb_transaction_get_estimated_range_size_bytes(tr, keys.begin.begin(), keys.begin.size(), keys.end.begin(), keys.end.size()), [](Reference<CFuture> f) {
int64_t bytes;
throw_on_error(fdb_future_get_int64(f->f, &bytes));
return bytes;
});
}

void TransactionImpl::addReadConflictRange(KeyRangeRef const& keys) {
throw_on_error( fdb_transaction_add_conflict_range( tr, keys.begin.begin(), keys.begin.size(), keys.end.begin(), keys.end.size(), FDB_CONFLICT_RANGE_TYPE_READ ) );
}
Expand Down
2 changes: 2 additions & 0 deletions bindings/flow/fdb_flow.h
Expand Up @@ -89,6 +89,8 @@ namespace FDB {
streamingMode);
}

virtual Future<int64_t> getEstimatedRangeSizeBytes(const KeyRange& keys) = 0;

virtual void addReadConflictRange(KeyRangeRef const& keys) = 0;
virtual void addReadConflictKey(KeyRef const& key) = 0;

Expand Down
8 changes: 8 additions & 0 deletions bindings/go/src/fdb/snapshot.go
Expand Up @@ -86,3 +86,11 @@ func (s Snapshot) GetReadVersion() FutureInt64 {
func (s Snapshot) GetDatabase() Database {
return s.transaction.db
}

func (s Snapshot) GetEstimatedRangeSizeBytes(r ExactRange) FutureInt64 {
beginKey, endKey := r.FDBRangeKeys()
return s.getEstimatedRangeSizeBytes(
beginKey.FDBKey(),
endKey.FDBKey(),
)
}
23 changes: 23 additions & 0 deletions bindings/go/src/fdb/transaction.go
Expand Up @@ -39,6 +39,7 @@ type ReadTransaction interface {
GetReadVersion() FutureInt64
GetDatabase() Database
Snapshot() Snapshot
GetEstimatedRangeSizeBytes(r ExactRange) FutureInt64

ReadTransactor
}
Expand Down Expand Up @@ -305,6 +306,28 @@ func (t Transaction) GetRange(r Range, options RangeOptions) RangeResult {
return t.getRange(r, options, false)
}

func (t *transaction) getEstimatedRangeSizeBytes(beginKey Key, endKey Key) FutureInt64 {
return &futureInt64{
future: newFuture(C.fdb_transaction_get_estimated_range_size_bytes(
t.ptr,
byteSliceToPtr(beginKey),
C.int(len(beginKey)),
byteSliceToPtr(endKey),
C.int(len(endKey)),
)),
}
}

// GetEstimatedRangeSizeBytes will get the byte size of the key range based on the
// byte sample collected by FDB
func (t Transaction) GetEstimatedRangeSizeBytes(r ExactRange) FutureInt64 {
beginKey, endKey := r.FDBRangeKeys()
return t.getEstimatedRangeSizeBytes(
beginKey.FDBKey(),
endKey.FDBKey(),
)
}

func (t *transaction) getReadVersion() FutureInt64 {
return &futureInt64{
future: newFuture(C.fdb_transaction_get_read_version(t.ptr)),
Expand Down
29 changes: 29 additions & 0 deletions bindings/java/fdbJNI.cpp
Expand Up @@ -646,6 +646,35 @@ JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBTransaction_Transaction_1
return (jlong)f;
}

JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBTransaction_Transaction_1getEstimatedRangeSizeBytes(JNIEnv *jenv, jobject, jlong tPtr,
jbyteArray beginKeyBytes, jbyteArray endKeyBytes) {
if( !tPtr || !beginKeyBytes || !endKeyBytes) {
throwParamNotNull(jenv);
return 0;
}
FDBTransaction *tr = (FDBTransaction *)tPtr;

uint8_t *startKey = (uint8_t *)jenv->GetByteArrayElements( beginKeyBytes, JNI_NULL );
if(!startKey) {
if( !jenv->ExceptionOccurred() )
throwRuntimeEx( jenv, "Error getting handle to native resources" );
return 0;
}

uint8_t *endKey = (uint8_t *)jenv->GetByteArrayElements(endKeyBytes, JNI_NULL);
if (!endKey) {
jenv->ReleaseByteArrayElements( beginKeyBytes, (jbyte *)startKey, JNI_ABORT );
if( !jenv->ExceptionOccurred() )
throwRuntimeEx( jenv, "Error getting handle to native resources" );
return 0;
}

FDBFuture *f = fdb_transaction_get_estimated_range_size_bytes( tr, startKey, jenv->GetArrayLength( beginKeyBytes ), endKey, jenv->GetArrayLength( endKeyBytes ) );
jenv->ReleaseByteArrayElements( beginKeyBytes, (jbyte *)startKey, JNI_ABORT );
jenv->ReleaseByteArrayElements( endKeyBytes, (jbyte *)endKey, JNI_ABORT );
return (jlong)f;
}

JNIEXPORT void JNICALL Java_com_apple_foundationdb_FDBTransaction_Transaction_1set(JNIEnv *jenv, jobject, jlong tPtr, jbyteArray keyBytes, jbyteArray valueBytes) {
if( !tPtr || !keyBytes || !valueBytes ) {
throwParamNotNull(jenv);
Expand Down
26 changes: 26 additions & 0 deletions bindings/java/src/main/com/apple/foundationdb/FDBTransaction.java
Expand Up @@ -70,6 +70,16 @@ public CompletableFuture<byte[]> getKey(KeySelector selector) {
return getKey_internal(selector, true);
}

@Override
public CompletableFuture<Long> getEstimatedRangeSizeBytes(byte[] begin, byte[] end) {
ajbeamon marked this conversation as resolved.
Show resolved Hide resolved
return FDBTransaction.this.getEstimatedRangeSizeBytes(begin, end);
}

@Override
public CompletableFuture<Long> getEstimatedRangeSizeBytes(Range range) {
return FDBTransaction.this.getEstimatedRangeSizeBytes(range);
}

///////////////////
// getRange -> KeySelectors
///////////////////
Expand Down Expand Up @@ -257,6 +267,21 @@ private CompletableFuture<byte[]> getKey_internal(KeySelector selector, boolean
}
}

@Override
public CompletableFuture<Long> getEstimatedRangeSizeBytes(byte[] begin, byte[] end) {
pointerReadLock.lock();
try {
return new FutureInt64(Transaction_getEstimatedRangeSizeBytes(getPtr(), begin, end), executor);
} finally {
pointerReadLock.unlock();
}
}

@Override
public CompletableFuture<Long> getEstimatedRangeSizeBytes(Range range) {
return this.getEstimatedRangeSizeBytes(range.begin, range.end);
}

///////////////////
// getRange -> KeySelectors
///////////////////
Expand Down Expand Up @@ -659,4 +684,5 @@ private native void Transaction_addConflictRange(long cPtr,
private native long Transaction_watch(long ptr, byte[] key) throws FDBException;
private native void Transaction_cancel(long cPtr);
private native long Transaction_getKeyLocations(long cPtr, byte[] key);
private native long Transaction_getEstimatedRangeSizeBytes(long cPtr, byte[] keyBegin, byte[] keyEnd);
}
20 changes: 20 additions & 0 deletions bindings/java/src/main/com/apple/foundationdb/ReadTransaction.java
Expand Up @@ -412,6 +412,26 @@ AsyncIterable<KeyValue> getRange(Range range,
AsyncIterable<KeyValue> getRange(Range range,
int limit, boolean reverse, StreamingMode mode);


/**
* Gets an estimate for the number of bytes stored in the given range.
*
* @param begin the beginning of the range (inclusive)
* @param end the end of the range (exclusive)
*
* @return a handle to access the results of the asynchronous call
*/
CompletableFuture<Long> getEstimatedRangeSizeBytes(byte[] begin, byte[] end);
ajbeamon marked this conversation as resolved.
Show resolved Hide resolved

/**
* Gets an estimate for the number of bytes stored in the given range.
*
* @param range the range of the keys
*
* @return a handle to access the results of the asynchronous call
*/
CompletableFuture<Long> getEstimatedRangeSizeBytes(Range range);

/**
* Returns a set of options that can be set on a {@code Transaction}
*
Expand Down
14 changes: 14 additions & 0 deletions bindings/python/fdb/impl.py
Expand Up @@ -449,6 +449,17 @@ def __getitem__(self, key):
if isinstance(key, slice):
return self.get_range(key.start, key.stop, reverse=(key.step == -1))
return self.get(key)

def get_estimated_range_size_bytes(self, beginKey, endKey):
if beginKey is None:
beginKey = b''
if endKey is None:
endKey = b'\xff'
return FutureInt64(self.capi.fdb_transaction_get_estimated_range_size_bytes(
self.tpointer,
beginKey, len(beginKey),
endKey, len(endKey)
))


class Transaction(TransactionRead):
Expand Down Expand Up @@ -1424,6 +1435,9 @@ def init_c_api():
ctypes.c_int, ctypes.c_int]
_capi.fdb_transaction_get_range.restype = ctypes.c_void_p

_capi.fdb_transaction_get_estimated_range_size_bytes.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_int, ctypes.c_void_p, ctypes.c_int]
_capi.fdb_transaction_get_estimated_range_size_bytes.restype = ctypes.c_void_p

_capi.fdb_transaction_add_conflict_range.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_int, ctypes.c_void_p, ctypes.c_int, ctypes.c_int]
_capi.fdb_transaction_add_conflict_range.restype = ctypes.c_int
_capi.fdb_transaction_add_conflict_range.errcheck = check_error_code
Expand Down
8 changes: 8 additions & 0 deletions bindings/ruby/lib/fdbimpl.rb
Expand Up @@ -108,6 +108,7 @@ def self.init_c_api
attach_function :fdb_transaction_get, [ :pointer, :pointer, :int, :int ], :pointer
attach_function :fdb_transaction_get_key, [ :pointer, :pointer, :int, :int, :int, :int ], :pointer
attach_function :fdb_transaction_get_range, [ :pointer, :pointer, :int, :int, :int, :pointer, :int, :int, :int, :int, :int, :int, :int, :int, :int ], :pointer
attach_function :fdb_transaction_get_estimated_range_size_bytes, [ :pointer, :pointer, :int, :pointer, :int ], :pointer
attach_function :fdb_transaction_set, [ :pointer, :pointer, :int, :pointer, :int ], :void
attach_function :fdb_transaction_clear, [ :pointer, :pointer, :int ], :void
attach_function :fdb_transaction_clear_range, [ :pointer, :pointer, :int, :pointer, :int ], :void
Expand Down Expand Up @@ -817,6 +818,13 @@ def get_range_start_with(prefix, options={}, &block)
prefix = prefix.dup.force_encoding "BINARY"
get_range(prefix, FDB.strinc(prefix), options, &block)
end

def get_estimated_range_size_bytes(beginKey, endKey)
bkey = FDB.key_to_bytes(beginKey)
ekey = FDB.key_to_bytes(endKey)
Int64Future.new(FDBC.fdb_transaction_get_estimated_range_size_bytes(@tpointer, bkey, bkey.bytesize, ekey, ekey.bytesize))
end

end

TransactionRead.class_variable_set("@@StreamingMode", @@StreamingMode)
Expand Down
1 change: 1 addition & 0 deletions fdbclient/IClientApi.h
Expand Up @@ -48,6 +48,7 @@ class ITransaction {
virtual ThreadFuture<Standalone<StringRef>> getVersionstamp() = 0;

virtual void addReadConflictRange(const KeyRangeRef& keys) = 0;
virtual ThreadFuture<int64_t> getEstimatedRangeSizeBytes(const KeyRangeRef& keys) = 0;

virtual void atomicOp(const KeyRef& key, const ValueRef& value, uint32_t operationType) = 0;
virtual void set(const KeyRef& key, const ValueRef& value) = 0;
Expand Down
21 changes: 21 additions & 0 deletions fdbclient/MultiVersionTransaction.actor.cpp
Expand Up @@ -145,6 +145,20 @@ ThreadFuture<Standalone<StringRef>> DLTransaction::getVersionstamp() {
});
}

ThreadFuture<int64_t> DLTransaction::getEstimatedRangeSizeBytes(const KeyRangeRef& keys) {
if (!api->transactionGetEstimatedRangeSizeBytes) {
return unsupported_operation();
}
FdbCApi::FDBFuture *f = api->transactionGetEstimatedRangeSizeBytes(tr, keys.begin.begin(), keys.begin.size(), keys.end.begin(), keys.end.size());

return toThreadFuture<int64_t>(api, f, [](FdbCApi::FDBFuture *f, FdbCApi *api) {
int64_t sampledSize;
FdbCApi::fdb_error_t error = api->futureGetInt64(f, &sampledSize);
ASSERT(!error);
return sampledSize;
});
}

void DLTransaction::addReadConflictRange(const KeyRangeRef& keys) {
throwIfError(api->transactionAddConflictRange(tr, keys.begin.begin(), keys.begin.size(), keys.end.begin(), keys.end.size(), FDBConflictRangeTypes::READ));
}
Expand Down Expand Up @@ -307,6 +321,7 @@ void DLApi::init() {
loadClientFunction(&api->transactionReset, lib, fdbCPath, "fdb_transaction_reset");
loadClientFunction(&api->transactionCancel, lib, fdbCPath, "fdb_transaction_cancel");
loadClientFunction(&api->transactionAddConflictRange, lib, fdbCPath, "fdb_transaction_add_conflict_range");
loadClientFunction(&api->transactionGetEstimatedRangeSizeBytes, lib, fdbCPath, "fdb_transaction_get_estimated_range_size_bytes", headerVersion >= 700);

loadClientFunction(&api->futureGetInt64, lib, fdbCPath, headerVersion >= 620 ? "fdb_future_get_int64" : "fdb_future_get_version");
loadClientFunction(&api->futureGetError, lib, fdbCPath, "fdb_future_get_error");
Expand Down Expand Up @@ -547,6 +562,12 @@ void MultiVersionTransaction::addReadConflictRange(const KeyRangeRef& keys) {
}
}

ThreadFuture<int64_t> MultiVersionTransaction::getEstimatedRangeSizeBytes(const KeyRangeRef& keys) {
auto tr = getTransaction();
auto f = tr.transaction ? tr.transaction->getEstimatedRangeSizeBytes(keys) : ThreadFuture<int64_t>(Never());
return abortableFuture(f, tr.onChange);
}

void MultiVersionTransaction::atomicOp(const KeyRef& key, const ValueRef& value, uint32_t operationType) {
auto tr = getTransaction();
if(tr.transaction) {
Expand Down
5 changes: 5 additions & 0 deletions fdbclient/MultiVersionTransaction.h
Expand Up @@ -81,6 +81,9 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
void (*transactionClear)(FDBTransaction *tr, uint8_t const *keyName, int keyNameLength);
void (*transactionClearRange)(FDBTransaction *tr, uint8_t const *beginKeyName, int beginKeyNameLength, uint8_t const *endKeyName, int endKeyNameLength);
void (*transactionAtomicOp)(FDBTransaction *tr, uint8_t const *keyName, int keyNameLength, uint8_t const *param, int paramLength, FDBMutationTypes::Option operationType);

FDBFuture* (*transactionGetEstimatedRangeSizeBytes)(FDBTransaction* tr, uint8_t const* begin_key_name,
int begin_key_name_length, uint8_t const* end_key_name, int end_key_name_length);

FDBFuture* (*transactionCommit)(FDBTransaction *tr);
fdb_error_t (*transactionGetCommittedVersion)(FDBTransaction *tr, int64_t *outVersion);
Expand Down Expand Up @@ -129,6 +132,7 @@ class DLTransaction : public ITransaction, ThreadSafeReferenceCounted<DLTransact
ThreadFuture<Standalone<RangeResultRef>> getRange( const KeyRangeRef& keys, GetRangeLimits limits, bool snapshot=false, bool reverse=false) override;
ThreadFuture<Standalone<VectorRef<const char*>>> getAddressesForKey(const KeyRef& key) override;
ThreadFuture<Standalone<StringRef>> getVersionstamp() override;
ThreadFuture<int64_t> getEstimatedRangeSizeBytes(const KeyRangeRef& keys) override;

void addReadConflictRange(const KeyRangeRef& keys) override;

Expand Down Expand Up @@ -228,6 +232,7 @@ class MultiVersionTransaction : public ITransaction, ThreadSafeReferenceCounted<
ThreadFuture<Standalone<StringRef>> getVersionstamp() override;

void addReadConflictRange(const KeyRangeRef& keys) override;
ThreadFuture<int64_t> getEstimatedRangeSizeBytes(const KeyRangeRef& keys) override;

void atomicOp(const KeyRef& key, const ValueRef& value, uint32_t operationType) override;
void set(const KeyRef& key, const ValueRef& value) override;
Expand Down