Skip to content
Permalink
Browse files

WIP Kudu queue hack

  • Loading branch information...
frew committed Jan 23, 2016
1 parent 5489476 commit c3009733d3c977f2f26888079459446836098efa
Showing with 29 additions and 8 deletions.
  1. +26 −5 src/kudu/tablet/tablet.cc
  2. +2 −2 src/kudu/tablet/tablet.h
  3. +1 −1 src/kudu/tablet/tablet_bootstrap.cc
@@ -357,7 +357,7 @@ void Tablet::StartTransaction(WriteTransactionState* tx_state) {
}

Status Tablet::InsertUnlocked(WriteTransactionState *tx_state,
RowOp* insert) {
RowOp* insert, int offset) {
const TabletComponents* comps = DCHECK_NOTNULL(tx_state->tablet_components());

CHECK(state_ == kOpen || state_ == kBootstrapping);
@@ -367,6 +367,26 @@ Status Tablet::InsertUnlocked(WriteTransactionState *tx_state,
DCHECK_EQ(tx_state->schema_at_decode_time(), schema()) << "Raced against schema change";
DCHECK(tx_state->op_id().IsInitialized()) << "TransactionState OpId needed for anchoring";

int key_col_size = schema()->num_key_columns();
if (key_col_size >= 3
&& schema()->column(key_col_size - 1).name() == "op_id_offset"
&& schema()->column(key_col_size - 2).name() == "op_id_index"
&& schema()->column(key_col_size - 3).name() == "op_id_term") {
// LOG(INFO) << "Replacing term/index cols with val: " << tx_state->op_id().ShortDebugString() << ":" << offset;
ContiguousRow row_to_modify(schema(), (uint8_t *)(insert->decoded_op.row_data));

// Substitute in op_id info
*reinterpret_cast<int64_t *>(row_to_modify.mutable_cell_ptr(key_col_size - 3)) = tx_state->op_id().term();
*reinterpret_cast<int64_t *>(row_to_modify.mutable_cell_ptr(key_col_size - 2)) = tx_state->op_id().index();
*reinterpret_cast<int32_t *>(row_to_modify.mutable_cell_ptr(key_col_size - 1)) = offset;

// Reset key_probe with new values
ConstContiguousRow row_key(&key_schema_, insert->decoded_op.row_data);
insert->key_probe.reset(new tablet::RowSetKeyProbe(row_key));
} else {
// LOG(INFO) << "No col to replace with val: " << tx_state->op_id().ShortDebugString() << ":" << offset;
}

ProbeStats stats;

// Submit the stats before returning from this function
@@ -495,16 +515,17 @@ void Tablet::StartApplying(WriteTransactionState* tx_state) {

void Tablet::ApplyRowOperations(WriteTransactionState* tx_state) {
StartApplying(tx_state);
for (RowOp* row_op : tx_state->row_ops()) {
ApplyRowOperation(tx_state, row_op);
for (int offset = 0; offset < tx_state->row_ops().size(); offset++) {
RowOp* row_op = tx_state->row_ops()[offset];
ApplyRowOperation(tx_state, row_op, offset);
}
}

void Tablet::ApplyRowOperation(WriteTransactionState* tx_state,
RowOp* row_op) {
RowOp* row_op, int offset) {
switch (row_op->decoded_op.type) {
case RowOperationsPB::INSERT:
ignore_result(InsertUnlocked(tx_state, row_op));
ignore_result(InsertUnlocked(tx_state, row_op, offset));
return;

case RowOperationsPB::UPDATE:
@@ -182,7 +182,7 @@ class Tablet {
// Apply a single row operation, which must already be prepared.
// The result is set back into row_op->result
void ApplyRowOperation(WriteTransactionState* tx_state,
RowOp* row_op);
RowOp* row_op, int offset);

// Create a new row iterator which yields the rows as of the current MVCC
// state of this tablet.
@@ -382,7 +382,7 @@ class Tablet {
// they were already acquired. Requires that handles for the relevant locks
// and MVCC transaction are present in the transaction state.
Status InsertUnlocked(WriteTransactionState *tx_state,
RowOp* insert);
RowOp* insert, int offset);

// A version of MutateRow that does not acquire locks and instead assumes
// they were already acquired. Requires that handles for the relevant locks
@@ -1324,7 +1324,7 @@ Status TabletBootstrap::FilterAndApplyOperations(WriteTransactionState* tx_state
}

// Actually apply it.
tablet_->ApplyRowOperation(tx_state, op);
tablet_->ApplyRowOperation(tx_state, op, op_idx - 1);
DCHECK(op->result != NULL);

// We expect that the above Apply() will always succeed, because we're

0 comments on commit c300973

Please sign in to comment.
You can’t perform that action at this time.