Skip to content

Commit

Permalink
Merge pull request #6664 from sfc-gh-jfu/jfu-grv-cache-multi-threaded
Browse files Browse the repository at this point in the history
Introduce multi-threaded/multi-version client support for GRV caching
  • Loading branch information
sfc-gh-vgasiunas committed Apr 7, 2022
2 parents 3d32594 + b0ae22f commit 2ff1198
Show file tree
Hide file tree
Showing 14 changed files with 324 additions and 21 deletions.
4 changes: 3 additions & 1 deletion bindings/c/CMakeLists.txt
@@ -1,6 +1,8 @@
set(FDB_C_SRCS
fdb_c.cpp
foundationdb/fdb_c.h)
foundationdb/fdb_c.h
foundationdb/fdb_c_internal.h
foundationdb/fdb_c_types.h)

file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/foundationdb)

Expand Down
17 changes: 17 additions & 0 deletions bindings/c/fdb_c.cpp
Expand Up @@ -19,13 +19,15 @@
*/

#include "fdbclient/FDBTypes.h"
#include "flow/ProtocolVersion.h"
#include <cstdint>
#define FDB_API_VERSION 710
#define FDB_INCLUDE_LEGACY_TYPES

#include "fdbclient/MultiVersionTransaction.h"
#include "fdbclient/MultiVersionAssignmentVars.h"
#include "foundationdb/fdb_c.h"
#include "foundationdb/fdb_c_internal.h"

int g_api_version = 0;

Expand Down Expand Up @@ -293,6 +295,10 @@ extern "C" DLLEXPORT fdb_error_t fdb_future_get_mappedkeyvalue_array(FDBFuture*
*out_more = rrr.more;);
}

extern "C" DLLEXPORT fdb_error_t fdb_future_get_shared_state(FDBFuture* f, DatabaseSharedState** outPtr) {
CATCH_AND_RETURN(*outPtr = (DatabaseSharedState*)((TSAV(DatabaseSharedState*, f)->get())););
}

extern "C" DLLEXPORT fdb_error_t fdb_future_get_string_array(FDBFuture* f, const char*** out_strings, int* out_count) {
CATCH_AND_RETURN(Standalone<VectorRef<const char*>> na = TSAV(Standalone<VectorRef<const char*>>, f)->get();
*out_strings = (const char**)na.begin();
Expand Down Expand Up @@ -426,6 +432,17 @@ extern "C" DLLEXPORT FDBFuture* fdb_database_create_snapshot(FDBDatabase* db,
.extractPtr());
}

extern "C" DLLEXPORT FDBFuture* fdb_database_create_shared_state(FDBDatabase* db) {
return (FDBFuture*)(DB(db)->createSharedState().extractPtr());
}

extern "C" DLLEXPORT void fdb_database_set_shared_state(FDBDatabase* db, DatabaseSharedState* p) {
try {
DB(db)->setSharedState(p);
} catch (...) {
}
}

// Get network thread busyness (updated every 1s)
// A value of 0 indicates that the client is more or less idle
// A value of 1 (or more) indicates that the client is saturated
Expand Down
11 changes: 1 addition & 10 deletions bindings/c/foundationdb/fdb_c.h
Expand Up @@ -58,21 +58,12 @@
#include <stdint.h>

#include "fdb_c_options.g.h"
#include "fdb_c_types.h"

#ifdef __cplusplus
extern "C" {
#endif

/* Pointers to these opaque types represent objects in the FDB API */
typedef struct FDB_future FDBFuture;
typedef struct FDB_result FDBResult;
typedef struct FDB_database FDBDatabase;
typedef struct FDB_tenant FDBTenant;
typedef struct FDB_transaction FDBTransaction;

typedef int fdb_error_t;
typedef int fdb_bool_t;

DLLEXPORT const char* fdb_get_error(fdb_error_t code);

DLLEXPORT fdb_bool_t fdb_error_predicate(int predicate_test, fdb_error_t code);
Expand Down
52 changes: 52 additions & 0 deletions bindings/c/foundationdb/fdb_c_internal.h
@@ -0,0 +1,52 @@
/*
* fdb_c_internal.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef FDB_C_INTERNAL_H
#define FDB_C_INTERNAL_H
#include "flow/ProtocolVersion.h"
#pragma once

#ifndef DLLEXPORT
#define DLLEXPORT
#endif

#ifndef WARN_UNUSED_RESULT
#define WARN_UNUSED_RESULT
#endif

#include "fdb_c_types.h"

#ifdef __cplusplus
extern "C" {
#endif

// forward declaration and typedef
typedef struct DatabaseSharedState DatabaseSharedState;

DLLEXPORT FDBFuture* fdb_database_create_shared_state(FDBDatabase* db);

DLLEXPORT void fdb_database_set_shared_state(FDBDatabase* db, DatabaseSharedState* p);

DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_future_get_shared_state(FDBFuture* f, DatabaseSharedState** outPtr);

#ifdef __cplusplus
}
#endif
#endif
47 changes: 47 additions & 0 deletions bindings/c/foundationdb/fdb_c_types.h
@@ -0,0 +1,47 @@
/*
* fdb_c_types.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef FDB_C_TYPES_H
#define FDB_C_TYPES_H
#pragma once

#ifndef DLLEXPORT
#define DLLEXPORT
#endif

#ifdef __cplusplus
extern "C" {
#endif

/* Pointers to these opaque types represent objects in the FDB API */
typedef struct FDB_future FDBFuture;
typedef struct FDB_result FDBResult;
typedef struct FDB_cluster FDBCluster;
typedef struct FDB_database FDBDatabase;
typedef struct FDB_tenant FDBTenant;
typedef struct FDB_transaction FDBTransaction;

typedef int fdb_error_t;
typedef int fdb_bool_t;

#ifdef __cplusplus
}
#endif
#endif
3 changes: 1 addition & 2 deletions bindings/c/test/apitester/TesterCorrectnessWorkload.cpp
Expand Up @@ -83,8 +83,7 @@ class ApiCorrectnessWorkload : public ApiWorkload {
auto results = std::make_shared<std::vector<std::optional<std::string>>>();
execTransaction(
[kvPairs, results](auto ctx) {
// TODO: Enable after merging with GRV caching
// ctx->tx()->setOption(FDB_TR_OPTION_USE_GRV_CACHE);
ctx->tx()->setOption(FDB_TR_OPTION_USE_GRV_CACHE);
auto futures = std::make_shared<std::vector<Future>>();
for (const auto& kv : *kvPairs) {
futures->push_back(ctx->tx()->get(kv.key, false));
Expand Down
6 changes: 6 additions & 0 deletions fdbclient/DatabaseContext.h
Expand Up @@ -29,6 +29,7 @@
#include <unordered_map>
#pragma once

#include "fdbclient/FDBTypes.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/KeyRangeMap.h"
#include "fdbclient/CommitProxyInterface.h"
Expand Down Expand Up @@ -519,6 +520,11 @@ class DatabaseContext : public ReferenceCounted<DatabaseContext>, public FastAll
int outstandingWatches;
int maxOutstandingWatches;

// Manage any shared state that may be used by MVC
DatabaseSharedState* sharedStatePtr;
Future<DatabaseSharedState*> initSharedState();
void setSharedState(DatabaseSharedState* p);

// GRV Cache
// Database-level read version cache storing the most recent successful GRV as well as the time it was requested.
double lastGrvTime;
Expand Down
25 changes: 25 additions & 0 deletions fdbclient/FDBTypes.h
Expand Up @@ -28,6 +28,8 @@
#include <unordered_set>

#include "flow/Arena.h"
#include "flow/FastRef.h"
#include "flow/ProtocolVersion.h"
#include "flow/flow.h"

enum class TraceFlags : uint8_t { unsampled = 0b00000000, sampled = 0b00000001 };
Expand Down Expand Up @@ -1350,6 +1352,29 @@ struct TenantMode {

uint32_t mode;
};
struct GRVCacheSpace {
Version cachedReadVersion;
double lastGrvTime;

GRVCacheSpace() : cachedReadVersion(Version(0)), lastGrvTime(0.0) {}
};

// This structure can be extended in the future to include additional features that required a shared state
struct DatabaseSharedState {
// These two members should always be listed first, in this order.
// This is to preserve compatibility with future updates of this shared state
// and ensures the MVC does not attempt to access methods incorrectly
// due to newly introduced offsets in the structure.
const ProtocolVersion protocolVersion;
void (*delRef)(DatabaseSharedState*);

Mutex mutexLock;
GRVCacheSpace grvCacheSpace;
std::atomic<int> refCount;

DatabaseSharedState()
: protocolVersion(currentProtocolVersion), mutexLock(Mutex()), grvCacheSpace(GRVCacheSpace()), refCount(0) {}
};

inline bool isValidPerpetualStorageWiggleLocality(std::string locality) {
int pos = locality.find(':');
Expand Down
5 changes: 5 additions & 0 deletions fdbclient/IClientApi.h
Expand Up @@ -20,6 +20,7 @@

#ifndef FDBCLIENT_ICLIENTAPI_H
#define FDBCLIENT_ICLIENTAPI_H
#include "flow/ProtocolVersion.h"
#pragma once

#include "fdbclient/FDBOptions.g.h"
Expand Down Expand Up @@ -151,6 +152,10 @@ class IDatabase {
// Management API, create snapshot
virtual ThreadFuture<Void> createSnapshot(const StringRef& uid, const StringRef& snapshot_command) = 0;

// Interface to manage shared state across multiple connections to the same Database
virtual ThreadFuture<DatabaseSharedState*> createSharedState() = 0;
virtual void setSharedState(DatabaseSharedState* p) = 0;

// used in template functions as the Transaction type that can be created through createTransaction()
using TransactionT = ITransaction;
};
Expand Down

0 comments on commit 2ff1198

Please sign in to comment.