Skip to content

Commit

Permalink
GG-33549 Add raft snapshot commands to Native Storage (#29)
Browse files Browse the repository at this point in the history
  • Loading branch information
ademakov committed Oct 1, 2021
1 parent 3c11636 commit 6420101
Show file tree
Hide file tree
Showing 8 changed files with 230 additions and 32 deletions.
36 changes: 27 additions & 9 deletions modules/native-storage/src/main/cpp/Table.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@

/** A merge-tree based table. */
class Table : public AbstractTable {
/** Capacity of insertion cache. */
static constexpr size_t INSERTION_CACHE_CAPACITY = 1000000;

MergeTree mergeTree; /**< Data storage container. */

std::vector<std::byte> insertionCache; /**< Cache used to speedup insertions of lots single tuples. */
Expand All @@ -29,15 +32,15 @@ class Table : public AbstractTable {

public:
/**
* @brief Construct a Table based om existing MergeTree.
* @brief Constructs a Table based om existing MergeTree.
*
* @param storageDir Directory for merge tree folders and files.
*/
Table(const fs::path &storageDir)
: mergeTree(storageDir) {}

/**
* @brief Construct a new empty Table.
* @brief Constructs a new empty Table.
*
* @param schema Description of schema used for merge tree data.
* @param storageDir Directory for merge tree folders and files.
Expand All @@ -47,11 +50,11 @@ class Table : public AbstractTable {

~Table() = default;

/** Get schema descriptor. */
/** Gets schema descriptor. */
const SchemaDescriptor &getSchema() const noexcept override { return mergeTree.getSchema(); }

/**
* @brief Get iterator for selected columns of storage.
* @brief Gets iterator for selected columns of storage.
*
* @param requiredColumns Indices of iterated columns.
* @return MergeTreeIterator Merge Tree iterator.
Expand All @@ -63,7 +66,7 @@ class Table : public AbstractTable {
}

/**
* @brief Appends tuple to the table.
* @brief Appends a tuple to the table.
*
* @param tuple Tuple binary in IEP-54 format.
* @param size Size of tuple binary, in bytes.
Expand All @@ -78,6 +81,24 @@ class Table : public AbstractTable {
flushInsertionCache();
}

/**
* @brief Makes a snapshot of the table.
*
* @param dir Directory to place snapshot to.
*/
void makeSnapshot(const fs::path &dir) {
flushInsertionCache();

mergeTree.makeSnapshot(dir);
}

/**
* @brief Loads a snapshot of the table.
*
* @param dir Directory to load snapshot from.
*/
void loadSnapshot(const fs::path &dir) { mergeTree.loadSnapshot(dir); }

/**
* @brief Totally erases the merge tree.
*/
Expand All @@ -86,15 +107,12 @@ class Table : public AbstractTable {
}

private:
/** Write insertion cache to merge tree storage. */
/** Writes insertion cache to merge tree storage. */
void flushInsertionCache() {
cachedTuplesCount = 0;

mergeTree.insert(insertionCache.data(), insertionCache.size());

insertionCache.clear();
}

/** Capacity of insertion cache. */
static constexpr size_t INSERTION_CACHE_CAPACITY = 1000000;
};
22 changes: 22 additions & 0 deletions modules/native-storage/src/main/cpp/javaInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,28 @@ JNIEXPORT jobject JNICALL Java_org_gridgain_internal_cpp_NativeInterface_process

break;
}
case org_gridgain_internal_cpp_NativeInterface_CMD_MAKE_SNAPSHOT: {
uint64_t tableId = readFromMemory<uint64_t>(ptr);

Table *table = reinterpret_cast<Table *>(tableId);

std::string path = readString(ptr);

table->makeSnapshot(path);

break;
}
case org_gridgain_internal_cpp_NativeInterface_CMD_LOAD_SNAPSHOT: {
uint64_t tableId = readFromMemory<uint64_t>(ptr);

Table *table = reinterpret_cast<Table *>(tableId);

std::string path = readString(ptr);

table->loadSnapshot(path);

break;
}
case org_gridgain_internal_cpp_NativeInterface_CMD_DROP_TABLE: {
uint64_t storageId = readFromMemory<uint64_t>(ptr);

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ class NativeInterface {
/** Make native storage instance and return its id. */
static final int CMD_CREATE_STORAGE = 0;

/** Make a table with given schema and return its id. */
/** Make a new table with given schema and return its id. */
static final int CMD_CREATE_TABLE = 1;

/** Make a table with given schema and return its id. */
/** Get an existing table and return its id. */
static final int CMD_GET_TABLE = 2;

/** Release native storage instance with given id. */
Expand All @@ -32,29 +32,35 @@ class NativeInterface {
/** Release a table with given id. */
static final int CMD_RELEASE_TABLE = 4;

/** Make a table snapshot. */
static final int CMD_MAKE_SNAPSHOT = 5;

/** Load a table snapshot. */
static final int CMD_LOAD_SNAPSHOT = 6;

/** Destroy a table with given id. */
static final int CMD_DROP_TABLE = 5;
static final int CMD_DROP_TABLE = 7;

/** Insert single tuple to native storage. */
static final int CMD_INSERT = 6;
static final int CMD_INSERT = 8;

/** Make native cursor instance for given storage and return its id. */
static final int CMD_CREATE_CURSOR = 7;
static final int CMD_CREATE_CURSOR = 9;

/** Checks if there is still data left for reading by cursor. */
static final int CMD_CURSOR_HAS_NEXT_PAGE = 8;
/** Check if there is still data left for reading by cursor. */
static final int CMD_CURSOR_HAS_NEXT_PAGE = 10;

/** Gets Direct Byte Buffer with memory managed by C++ Cursor object. */
static final int CMD_CURSOR_GET_BUF = 9;
static final int CMD_CURSOR_GET_BUF = 11;

/** Gets next set of tuples from storage. */
static final int CMD_CURSOR_GATHER_TUPLES = 10;
static final int CMD_CURSOR_GATHER_TUPLES = 12;

/** Get schema of cursor output tuples. */
static final int CMD_CURSOR_GET_OUTPUT_SCHEMA = 11;
static final int CMD_CURSOR_GET_OUTPUT_SCHEMA = 13;

/** Release native cursor instance with given id. */
static final int CMD_CURSOR_RELEASE = 12;
static final int CMD_CURSOR_RELEASE = 14;

/** Load C++ library. */
static {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.file.Path;

import org.apache.ignite.internal.schema.SchemaDescriptor;

Expand Down Expand Up @@ -112,6 +113,80 @@ public NativeCursor query(IgniteTableScanStub tableScan) {
return new NativeCursor(cursorId);
}

/**
* Makes a table snapshot.
*
* @param path Directory to place snapshot to.
*/
public void saveSnapshot(Path path) {
byte[] pathBytes = path.toString().getBytes();

// 1. Calculate the whole command size.

int bufSize = Integer.BYTES; // Command code.

bufSize += Long.BYTES; // Native table ID.

bufSize += Integer.BYTES; // Path length.

bufSize += pathBytes.length; // Path.

// 2. Serialize the command.

ByteBuffer buf = ByteBuffer.allocateDirect(bufSize);

buf.order(ByteOrder.LITTLE_ENDIAN);

buf.putInt(NativeInterface.CMD_MAKE_SNAPSHOT);

buf.putLong(id);

buf.putInt(pathBytes.length);

buf.put(pathBytes);

// 3. Execute the command.

NativeInterface.process(buf);
}

/**
* Loads a table snapshot.
*
* @param path Directory to load snapshot from.
*/
public void loadSnapshot(Path path) {
byte[] pathBytes = path.toString().getBytes();

// 1. Calculate the whole command size.

int bufSize = Integer.BYTES; // Command code.

bufSize += Long.BYTES; // Native table ID.

bufSize += Integer.BYTES; // Path length.

bufSize += pathBytes.length; // Path.

// 2. Serialize the command.

ByteBuffer buf = ByteBuffer.allocateDirect(bufSize);

buf.order(ByteOrder.LITTLE_ENDIAN);

buf.putInt(NativeInterface.CMD_LOAD_SNAPSHOT);

buf.putLong(id);

buf.putInt(pathBytes.length);

buf.put(pathBytes);

// 3. Execute the command.

NativeInterface.process(buf);
}

/** Release native table instance. */
@Override public void close() {
if (id != 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,23 @@ else if (clo.command() instanceof GetAndUpsertCommand) {

/** {@inheritDoc} */
@Override public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
// Not implemented yet.
// TODO: do it asynchronously
try {
table.saveSnapshot(path);
doneClo.accept(null);
} catch (Exception e) {
doneClo.accept(e);
}
}

/** {@inheritDoc} */
@Override public boolean onSnapshotLoad(Path path) {
// Not implemented yet.
return false;
try {
table.loadSnapshot(path);
return true;
} catch (Exception e) {
return false;
}
}

/** {@inheritDoc} */
Expand Down

0 comments on commit 6420101

Please sign in to comment.