Skip to content

Commit

Permalink
Implement database drop operation
Browse files Browse the repository at this point in the history
Signed-off-by: Cole Miller <cole.miller@canonical.com>
  • Loading branch information
cole-miller committed Nov 15, 2022
1 parent 0c38457 commit 65a5f31
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 8 deletions.
14 changes: 8 additions & 6 deletions src/command.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#include "lib/serialize.h"

/* Command type codes */
enum { COMMAND_OPEN = 1, COMMAND_FRAMES, COMMAND_UNDO, COMMAND_CHECKPOINT };
enum { COMMAND_OPEN = 1, COMMAND_FRAMES, COMMAND_UNDO, COMMAND_CHECKPOINT, COMMAND_DROP };

/* Hold information about an array of WAL frames. */
struct frames
Expand Down Expand Up @@ -44,12 +44,14 @@ typedef struct frames frames_t;
X(frames, frames, ##__VA_ARGS__)
#define COMMAND__UNDO(X, ...) X(uint64, tx_id, ##__VA_ARGS__)
#define COMMAND__CHECKPOINT(X, ...) X(text, filename, ##__VA_ARGS__)
#define COMMAND__DROP(X, ...) X(text, filename, ##__VA_ARGS__)

#define COMMAND__TYPES(X, ...) \
X(open, OPEN, __VA_ARGS__) \
X(frames, FRAMES, __VA_ARGS__) \
X(undo, UNDO, __VA_ARGS__) \
X(checkpoint, CHECKPOINT, __VA_ARGS__)
#define COMMAND__TYPES(X, ...) \
X(open, OPEN, __VA_ARGS__) \
X(frames, FRAMES, __VA_ARGS__) \
X(undo, UNDO, __VA_ARGS__) \
X(checkpoint, CHECKPOINT, __VA_ARGS__) \
X(drop, DROP, __VA_ARGS__)

COMMAND__TYPES(COMMAND__DEFINE);

Expand Down
1 change: 1 addition & 0 deletions src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ void db__init(struct db *db, struct config *config, const char *filename)
db->follower = NULL;
db->tx_id = 0;
db->read_lock = 0;
db->dropping = false;
QUEUE__INIT(&db->leaders);
}

Expand Down
1 change: 1 addition & 0 deletions src/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ struct db
unsigned tx_id; /* Current ongoing transaction ID, if any */
queue queue; /* Prev/next database, used by the registry */
int read_lock; /* Lock used by snapshots & checkpoints */
bool dropping; /* Set when a client has requested to drop this database */
};

/**
Expand Down
17 changes: 17 additions & 0 deletions src/fsm.c
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,20 @@ static int apply_checkpoint(struct fsm *f, const struct command_checkpoint *c)
return 0;
}

/* Drop or delete a database. */
static int apply_drop(struct fsm *f, const struct command_drop *c)
{
tracef("apply drop %s", c->filename);
int rc;
sqlite3_vfs *vfs;

/* Delete the database "file". This will also delete the associated WAL. */
vfs = sqlite3_vfs_find(f->registry->config->name);
assert(vfs != NULL);
rc = vfs->xDelete(vfs, c->filename, 0);
return rc;
}

static int fsm__apply(struct raft_fsm *fsm,
const struct raft_buffer *buf,
void **result)
Expand Down Expand Up @@ -332,6 +346,9 @@ static int fsm__apply(struct raft_fsm *fsm,
case COMMAND_CHECKPOINT:
rc = apply_checkpoint(f, command);
break;
case COMMAND_DROP:
rc = apply_drop(f, command);
break;
default:
rc = RAFT_MALFORMED;
goto err_after_command_decode;
Expand Down
114 changes: 114 additions & 0 deletions src/gateway.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "gateway.h"

#include "bind.h"
#include "command.h"
#include "protocol.h"
#include "query.h"
#include "request.h"
Expand Down Expand Up @@ -256,6 +257,12 @@ static int handle_open(struct handle *req)
tracef("registry db get failed %d", rc);
return rc;
}
if (db->dropping) {
tracef("db is being dropped");
failure(req, DQLITE_ERROR,
"another client has requested to drop this database");
return 0;
}
g->leader = sqlite3_malloc(sizeof *g->leader);
if (g->leader == NULL) {
tracef("malloc failed");
Expand Down Expand Up @@ -1268,6 +1275,113 @@ static int handle_weight(struct handle *req)
return 0;
}

struct drop
{
struct raft_apply apply;
struct gateway *gateway;
};

static void dropApplyCb(struct raft_apply *apply, int status, void *result)
{
(void)result;
tracef("drop apply cb status %d", status);
if (status != 0) {
/* TODO */
return;
}

/* Downcast (struct drop embeds struct raft_apply). */
struct drop *drop = (struct drop *)apply;
struct gateway *g;
struct leader *l;
struct db *db;
struct response_empty response = {0};
struct handle *req;

assert(drop != NULL);
g = drop->gateway;
assert(g != NULL);
l = g->leader;
assert(l != NULL);
db = l->db;
assert(db != NULL);
req = g->req;
assert(req != NULL);

g->req = NULL;

/* Sanity checks. */
assert(QUEUE__DATA(QUEUE__HEAD(&db->leaders), struct leader, queue) == l);
assert(QUEUE__NEXT(QUEUE__HEAD(&db->leaders)) == NULL);
assert(db->dropping);

/* Close everything up. */
g->leader = NULL;
leader__close(l);
sqlite3_free(l);
db__close(db);
sqlite3_free(db);
raft_free(drop);

/* We did it! */
SUCCESS(empty, EMPTY);
}

static int handle_drop(struct handle *req)
{
tracef("handle drop");
struct cursor *cursor = &req->cursor;
struct gateway *g = req->gateway;
struct db *db = g->leader->db;
struct raft_buffer buf;
struct command_drop c;
struct drop *drop;
int rc;

START_V0(drop, empty);
(void)response;
CHECK_LEADER(req);
LOOKUP_DB(request.db_id);
FAIL_IF_CHECKPOINTING;

/* Check that we're the sole connection. */
if (QUEUE__DATA(QUEUE__HEAD(&db->leaders), struct leader, queue) != g->leader ||
QUEUE__NEXT(QUEUE__HEAD(&db->leaders)) != NULL) {
tracef("drop db multiple leader connections");
failure(req, DQLITE_ERROR, "can't drop database with multiple leader connections");
return 0;
}

/* Put together an apply request. */
c.filename = db->filename;
rc = command__encode(COMMAND_DROP, &c, &buf);
if (rc != 0) {
tracef("command encode failed %d", rc);
return DQLITE_ERROR;
}
drop = raft_malloc(sizeof(*drop));
if (drop == NULL) {
tracef("raft_malloc failed");
return DQLITE_NOMEM;
}
drop->apply.data = drop;
drop->apply.type = COMMAND_DROP;
drop->gateway = g;
rc = raft_apply(g->leader->raft, &drop->apply, &buf, 1, dropApplyCb);
if (rc != 0) {
tracef("raft apply failed %d", rc);
raft_free(drop);
failure(req, DQLITE_ERROR, "failed to submit raft log entry");
return 0;
}

g->req = req;
/* Prevent db from being opened while drop is pending. */
db->dropping = true;

return 0;
}

int gateway__handle(struct gateway *g,
struct handle *req,
int type,
Expand Down
3 changes: 2 additions & 1 deletion src/protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ enum {
DQLITE_REQUEST_CLUSTER,
DQLITE_REQUEST_TRANSFER,
DQLITE_REQUEST_DESCRIBE,
DQLITE_REQUEST_WEIGHT
DQLITE_REQUEST_WEIGHT,
DQLITE_REQUEST_DROP
};

#define DQLITE_REQUEST_CLUSTER_FORMAT_V0 0 /* ID and address */
Expand Down
4 changes: 3 additions & 1 deletion src/request.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#define REQUEST_TRANSFER(X, ...) X(uint64, id, ##__VA_ARGS__)
#define REQUEST_DESCRIBE(X, ...) X(uint64, format, ##__VA_ARGS__)
#define REQUEST_WEIGHT(X, ...) X(uint64, weight, ##__VA_ARGS__)
#define REQUEST_DROP(X, ...) X(uint64, db_id, ##__VA_ARGS__)

#define REQUEST__DEFINE(LOWER, UPPER, _) \
SERIALIZE__DEFINE(request_##LOWER, REQUEST_##UPPER);
Expand All @@ -64,7 +65,8 @@
X(cluster, CLUSTER, __VA_ARGS__) \
X(transfer, TRANSFER, __VA_ARGS__) \
X(describe, DESCRIBE, __VA_ARGS__) \
X(weight, WEIGHT, __VA_ARGS__)
X(weight, WEIGHT, __VA_ARGS__) \
X(drop, DROP, __VA_ARGS__)

REQUEST__TYPES(REQUEST__DEFINE);

Expand Down

0 comments on commit 65a5f31

Please sign in to comment.