Skip to content

Commit

Permalink
Improve count(*) performance and fix index merge bug
Browse files Browse the repository at this point in the history
Summary:
Today in `SELECT count(*)` MyRocks would still decode every single column due to this check, despite the readset being empty:

```
 // bitmap is cleared on index merge, but it still needs to decode columns
    bool field_requested =
        decode_all_fields || m_verify_row_debug_checksums ||
        bitmap_is_set(field_map, m_table->field[i]->field_index);
```

As a result MyRocks is significantly slower than InnoDB in this particular scenario.

Turns out in index merge, when it tries to reset, it calls ha_index_init with an empty column_bitmap, so our field decoders didn't know it needs to decode anything, so the entire query would return nothing. This is discussed in [this commit](70f2bcd), and [issue 624](#624) and [PR 626](#626). So the workaround we had at that time is to simply treat empty map as implicitly everything, and the side effect is massively slowed down count(*).

We have a few options to address this:
1. Fix index merge optimizer - looking at the code in QUICK_RANGE_SELECT::init_ror_merged_scan, it actually fixes up the column_bitmap properly, but after init/reset, so the fix would simply be moving the bitmap set code up. For secondary keys, prepare_for_position will automatically call `mark_columns_used_by_index_no_reset(s->primary_key, read_set)` if HA_PRIMARY_KEY_REQUIRED_FOR_POSITION is set (true for both InnoDB and MyRocks), so we would know correctly that we need to unpack PK when walking SK during index merge.
2. Overriding `column_bitmaps_signal` and setup decoders whenever the bitmap changes - however this doesn't work by itself. Because no storage engine today actually use handler::column_bitmaps_signal this path haven't been tested properly in index merge. In this case, QUICK_RANGE_SELECT::init_ror_merged_scan should call set_column_bitmaps_no_signal to avoid resetting the correct read/write set of head since head is used as first handler (reuses_handler=true) and subsequent place holders for read/write set updates (reuse_handler=false).
3. Follow InnoDB's solution - InnoDB delays it actually initialize its template again in index_read for the 2nd time (relying on `prebuilt->sql_stat_start`), and during index_read `QUICK_RANGE_SELECT::column_bitmap` is already fixed up and the table read/write set is switched to it, so the new template would be built correctly.

In order to make it easier to maintain and port, after discussing with Manuel, I'm going with a simplified version of #3 that delays decoder creation until the first read operation (index_*, rnd_*, range_read_*, multi_range_read_*), and setting the delay flag in index_init / rnd_init / multi_range_read_init.

Also, I ran into a bug with truncation_partition where Rdb_converter's tbl_def is stale (we only update ha_rocksdb::m_tbl_def), but it is fine because it is not being used after table open. But my change moves the lookup_bitmap initialization into Rdb_converter which takes a dependency on Rdb_converter::m_tbl_def so now we need to reset it properly.

Reviewed By: lth

Differential Revision: D25521518

fbshipit-source-id: a46ed3e71fa
  • Loading branch information
yizhang82 authored and facebook-github-bot committed Feb 4, 2021
1 parent 49338f1 commit 25b42b5
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 36 deletions.
72 changes: 52 additions & 20 deletions storage/rocksdb/ha_rocksdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6572,7 +6572,8 @@ ha_rocksdb::ha_rocksdb(my_core::handlerton *const hton,
mrr_used_cpk(false),
m_in_rpl_delete_rows(false),
m_in_rpl_update_rows(false),
m_force_skip_unique_check(false) {}
m_force_skip_unique_check(false),
m_need_build_decoder(false) {}

ha_rocksdb::~ha_rocksdb() {
int err MY_ATTRIBUTE((__unused__));
Expand Down Expand Up @@ -8126,7 +8127,7 @@ int ha_rocksdb::create(const char *const name, TABLE *const table_arg,
HA_EXIT_SUCCESS OK
other HA_ERR error code (can be SE-specific)
*/
int ha_rocksdb::truncate_table(Rdb_tbl_def *tbl_def_arg, const TABLE *table_arg,
int ha_rocksdb::truncate_table(Rdb_tbl_def *tbl_def_arg, TABLE *table_arg,
ulonglong auto_increment_value) {
DBUG_ENTER_FUNC();

Expand Down Expand Up @@ -8212,6 +8213,7 @@ int ha_rocksdb::truncate_table(Rdb_tbl_def *tbl_def_arg, const TABLE *table_arg,

/* Update the local m_tbl_def reference */
m_tbl_def = ddl_manager.find(orig_tablename);
m_converter.reset(new Rdb_converter(ha_thd(), m_tbl_def, table_arg));
DBUG_RETURN(err);
}

Expand Down Expand Up @@ -8516,7 +8518,7 @@ int ha_rocksdb::read_row_from_secondary_key(uchar *const buf,
DBUG_EXECUTE_IF("dbug.rocksdb.HA_EXTRA_KEYREAD", { m_keyread_only = true; });

bool covered_lookup = (m_keyread_only && kd.can_cover_lookup()) ||
kd.covers_lookup(&value, &m_lookup_bitmap);
kd.covers_lookup(&value, m_converter->get_lookup_bitmap());

#ifndef DBUG_OFF
m_keyread_only = save_keyread_only;
Expand Down Expand Up @@ -8647,7 +8649,8 @@ int ha_rocksdb::secondary_index_read(const int keyno, uchar *const buf) {
rocksdb::Slice value = m_scan_it->value();
bool covered_lookup =
(m_keyread_only && m_key_descr_arr[keyno]->can_cover_lookup()) ||
m_key_descr_arr[keyno]->covers_lookup(&value, &m_lookup_bitmap);
m_key_descr_arr[keyno]->covers_lookup(
&value, m_converter->get_lookup_bitmap());
if (covered_lookup && m_lock_rows == RDB_LOCK_NONE) {
rc = m_key_descr_arr[keyno]->unpack_record(
table, buf, &key, &value,
Expand Down Expand Up @@ -8685,6 +8688,8 @@ int ha_rocksdb::read_range_first(const key_range *const start_key,
bool eq_range_arg, bool sorted) {
DBUG_ENTER_FUNC();

check_build_decoder();

int result;

eq_range = eq_range_arg;
Expand Down Expand Up @@ -8735,6 +8740,8 @@ int ha_rocksdb::index_read_map(uchar *const buf, const uchar *const key,
enum ha_rkey_function find_flag) {
DBUG_ENTER_FUNC();

check_build_decoder();

DBUG_RETURN(index_read_map_impl(buf, key, keypart_map, find_flag, nullptr));
}

Expand Down Expand Up @@ -9040,6 +9047,8 @@ int ha_rocksdb::index_read_last_map(uchar *const buf, const uchar *const key,
key_part_map keypart_map) {
DBUG_ENTER_FUNC();

check_build_decoder();

DBUG_RETURN(index_read_map(buf, key, keypart_map, HA_READ_PREFIX_LAST));
}

Expand Down Expand Up @@ -9378,6 +9387,8 @@ int ha_rocksdb::get_row_by_rowid(uchar *const buf, const char *const rowid,
int ha_rocksdb::index_next(uchar *const buf) {
DBUG_ENTER_FUNC();

check_build_decoder();

bool moves_forward = true;
ha_statistic_increment(&SSV::ha_read_next_count);
if (m_key_descr_arr[active_index]->m_is_reverse_cf) {
Expand All @@ -9398,6 +9409,8 @@ int ha_rocksdb::index_next(uchar *const buf) {
int ha_rocksdb::index_prev(uchar *const buf) {
DBUG_ENTER_FUNC();

check_build_decoder();

bool moves_forward = false;
ha_statistic_increment(&SSV::ha_read_prev_count);
if (m_key_descr_arr[active_index]->m_is_reverse_cf) {
Expand Down Expand Up @@ -9458,6 +9471,8 @@ int ha_rocksdb::index_next_with_direction(uchar *const buf, bool move_forward) {
int ha_rocksdb::index_first(uchar *const buf) {
DBUG_ENTER_FUNC();

check_build_decoder();

m_sk_match_prefix = nullptr;
ha_statistic_increment(&SSV::ha_read_first_count);
int rc = m_key_descr_arr[active_index]->m_is_reverse_cf
Expand All @@ -9476,6 +9491,8 @@ int ha_rocksdb::index_first(uchar *const buf) {
int ha_rocksdb::index_last(uchar *const buf) {
DBUG_ENTER_FUNC();

check_build_decoder();

m_sk_match_prefix = nullptr;
ha_statistic_increment(&SSV::ha_read_last_count);
int rc = m_key_descr_arr[active_index]->m_is_reverse_cf
Expand Down Expand Up @@ -10901,17 +10918,16 @@ void ha_rocksdb::setup_iterator_for_rnd_scan() {
int ha_rocksdb::rnd_init(bool scan) {
DBUG_ENTER_FUNC();

m_need_build_decoder = true;
active_index = table->s->primary_key;

THD *thd = ha_thd();
if (thd && thd->killed) {
DBUG_RETURN(HA_ERR_QUERY_INTERRUPTED);
}

Rdb_transaction *const tx = get_or_create_tx(table->in_use);

// when this table is being updated, decode all fields
m_converter->setup_field_decoders(table->read_set,
m_lock_rows == RDB_LOCK_WRITE);

if (scan) {
m_rnd_scan_is_new_snapshot = !tx->has_snapshot();
setup_iterator_for_rnd_scan();
Expand All @@ -10935,6 +10951,8 @@ int ha_rocksdb::rnd_init(bool scan) {
int ha_rocksdb::rnd_next(uchar *const buf) {
DBUG_ENTER_FUNC();

check_build_decoder();

int rc;
ha_statistic_increment(&SSV::ha_read_rnd_next_count);
for (;;) {
Expand Down Expand Up @@ -11073,11 +11091,26 @@ int ha_rocksdb::rnd_next_with_direction(uchar *const buf, bool move_forward) {
int ha_rocksdb::rnd_end() {
DBUG_ENTER_FUNC();

m_need_build_decoder = false;

release_scan_iterator();

DBUG_RETURN(HA_EXIT_SUCCESS);
}

void ha_rocksdb::build_decoder() {
m_converter->setup_field_decoders(table->read_set, active_index,
m_keyread_only,
m_lock_rows == RDB_LOCK_WRITE);
}

void ha_rocksdb::check_build_decoder() {
if (m_need_build_decoder) {
build_decoder();
m_need_build_decoder = false;
}
}

/**
@return
HA_EXIT_SUCCESS OK
Expand All @@ -11086,6 +11119,9 @@ int ha_rocksdb::rnd_end() {
int ha_rocksdb::index_init(uint idx, bool sorted) {
DBUG_ENTER_FUNC();

m_need_build_decoder = true;
active_index = idx;

THD *thd = ha_thd();
if (thd && thd->killed) {
DBUG_RETURN(HA_ERR_QUERY_INTERRUPTED);
Expand All @@ -11094,21 +11130,11 @@ int ha_rocksdb::index_init(uint idx, bool sorted) {
Rdb_transaction *const tx = get_or_create_tx(table->in_use);
DBUG_ASSERT(tx != nullptr);

// when this table is being updated, decode all fields
m_converter->setup_field_decoders(table->read_set,
m_lock_rows == RDB_LOCK_WRITE);

if (!m_keyread_only) {
m_key_descr_arr[idx]->get_lookup_bitmap(table, &m_lookup_bitmap);
}

// If m_lock_rows is not RDB_LOCK_NONE then we will be doing a get_for_update
// when accessing the index, so don't acquire the snapshot right away.
// Otherwise acquire the snapshot immediately.
tx->acquire_snapshot(m_lock_rows == RDB_LOCK_NONE);

active_index = idx;

DBUG_RETURN(HA_EXIT_SUCCESS);
}

Expand All @@ -11119,9 +11145,9 @@ int ha_rocksdb::index_init(uint idx, bool sorted) {
int ha_rocksdb::index_end() {
DBUG_ENTER_FUNC();

release_scan_iterator();
m_need_build_decoder = false;

bitmap_free(&m_lookup_bitmap);
release_scan_iterator();

active_index = MAX_KEY;
in_range_check_pushed_down = FALSE;
Expand Down Expand Up @@ -11458,6 +11484,8 @@ void ha_rocksdb::position(const uchar *const record) {
int ha_rocksdb::rnd_pos(uchar *const buf, uchar *const pos) {
DBUG_ENTER_FUNC();

check_build_decoder();

int rc;
size_t len;

Expand Down Expand Up @@ -16030,6 +16058,8 @@ class Mrr_sec_key_rowid_source : public Mrr_rowid_source {
int ha_rocksdb::multi_range_read_init(RANGE_SEQ_IF *seq, void *seq_init_param,
uint n_ranges, uint mode,
HANDLER_BUFFER *buf) {
m_need_build_decoder = true;

int res;

if (!current_thd->optimizer_switch_flag(OPTIMIZER_SWITCH_MRR) ||
Expand Down Expand Up @@ -16254,6 +16284,8 @@ void ha_rocksdb::mrr_free_rows() {
}

int ha_rocksdb::multi_range_read_next(char **range_info) {
check_build_decoder();

if (mrr_uses_default_impl) {
return handler::multi_range_read_next(range_info);
}
Expand Down
15 changes: 7 additions & 8 deletions storage/rocksdb/ha_rocksdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -374,13 +374,6 @@ class ha_rocksdb : public my_core::handler {

void set_last_rowkey(const uchar *const old_data);

/*
For the active index, indicates which columns must be covered for the
current lookup to be covered. If the bitmap field is null, that means this
index does not cover the current lookup for any record.
*/
MY_BITMAP m_lookup_bitmap = {nullptr, 0, 0, nullptr, nullptr};

int alloc_key_buffers(const TABLE *const table_arg,
const Rdb_tbl_def *const tbl_def_arg,
bool alloc_alter_buffers = false)
Expand Down Expand Up @@ -956,7 +949,7 @@ class ha_rocksdb : public my_core::handler {
MY_ATTRIBUTE((__warn_unused_result__));
int create_table(const std::string &table_name, const TABLE *table_arg,
ulonglong auto_increment_value);
int truncate_table(Rdb_tbl_def *tbl_def, const TABLE *table_arg,
int truncate_table(Rdb_tbl_def *tbl_def, TABLE *table_arg,
ulonglong auto_increment_value);
bool check_if_incompatible_data(HA_CREATE_INFO *const info,
uint table_changes) override
Expand Down Expand Up @@ -1016,6 +1009,9 @@ class ha_rocksdb : public my_core::handler {

void update_row_read(ulonglong count);

void build_decoder();
void check_build_decoder();

public:
virtual void rpl_before_delete_rows() override;
virtual void rpl_after_delete_rows() override;
Expand All @@ -1030,6 +1026,9 @@ class ha_rocksdb : public my_core::handler {
bool m_in_rpl_update_rows;

bool m_force_skip_unique_check;

/* Need to build decoder on next read operation */
bool m_need_build_decoder;
};

/*
Expand Down
3 changes: 2 additions & 1 deletion storage/rocksdb/nosql_access.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1325,7 +1325,8 @@ bool INLINE_ATTR select_exec::run_query() {
bool ret = false;

if (m_unpack_value) {
m_converter->setup_field_decoders(m_table->read_set);
m_converter->setup_field_decoders(m_table->read_set, m_index,
false /* keyread_only */);
}

m_row_buf_prefix_end_pos = 0;
Expand Down
21 changes: 16 additions & 5 deletions storage/rocksdb/rdb_converter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -306,13 +306,15 @@ Rdb_converter::Rdb_converter(const THD *thd, const Rdb_tbl_def *tbl_def,
m_row_checksums_checked = 0;
m_null_bytes = nullptr;
setup_field_encoders();
m_lookup_bitmap = {nullptr, 0, 0, nullptr, nullptr};
}

Rdb_converter::~Rdb_converter() {
my_free(m_encoder_arr);
m_encoder_arr = nullptr;
// These are needed to suppress valgrind errors in rocksdb.partition
m_storage_record.free();
bitmap_free(&m_lookup_bitmap);
}

/*
Expand All @@ -337,31 +339,30 @@ void Rdb_converter::get_storage_type(Rdb_field_encoder *const encoder,
Setup which fields will be unpacked when reading rows
@detail
Three special cases when we still unpack all fields:
Two special cases when we still unpack all fields:
- When client requires decode_all_fields, such as this table is being
updated (m_lock_rows==RDB_LOCK_WRITE).
- When @@rocksdb_verify_row_debug_checksums is ON (In this mode, we need
to read all fields to find whether there is a row checksum at the end. We
could skip the fields instead of decoding them, but currently we do
decoding.)
- On index merge as bitmap is cleared during that operation
@seealso
Rdb_converter::setup_field_encoders()
Rdb_converter::convert_record_from_storage_format()
*/
void Rdb_converter::setup_field_decoders(const MY_BITMAP *field_map,
uint active_index, bool keyread_only,
bool decode_all_fields) {
m_key_requested = false;
m_decoders_vect.clear();
bitmap_free(&m_lookup_bitmap);
int last_useful = 0;
int skip_size = 0;

for (uint i = 0; i < m_table->s->fields; i++) {
// bitmap is cleared on index merge, but it still needs to decode columns
bool field_requested =
decode_all_fields || m_verify_row_debug_checksums ||
bitmap_is_clear_all(field_map) ||
bitmap_is_set(field_map, m_table->field[i]->field_index);

// We only need the decoder if the whole record is stored.
Expand Down Expand Up @@ -397,6 +398,11 @@ void Rdb_converter::setup_field_decoders(const MY_BITMAP *field_map,
// skipping. Remove them.
m_decoders_vect.erase(m_decoders_vect.begin() + last_useful,
m_decoders_vect.end());

if (!keyread_only && active_index != m_table->s->primary_key) {
m_tbl_def->m_key_descr_arr[active_index]->get_lookup_bitmap(
m_table, &m_lookup_bitmap);
}
}

void Rdb_converter::setup_field_encoders() {
Expand Down Expand Up @@ -583,6 +589,11 @@ int Rdb_converter::convert_record_from_storage_format(
const rocksdb::Slice *const key_slice,
const rocksdb::Slice *const value_slice, uchar *const dst,
bool decode_value = true) {
bool skip_value = !decode_value || get_decode_fields()->size() == 0;
if (!m_key_requested && skip_value) {
return HA_EXIT_SUCCESS;
}

int err = HA_EXIT_SUCCESS;

Rdb_string_reader value_slice_reader(value_slice);
Expand All @@ -604,7 +615,7 @@ int Rdb_converter::convert_record_from_storage_format(
}
}

if (!decode_value || get_decode_fields()->size() == 0) {
if (skip_value) {
// We are done
return HA_EXIT_SUCCESS;
}
Expand Down
Loading

0 comments on commit 25b42b5

Please sign in to comment.