Skip to content
Permalink
Browse files

Improve update handler (long unique keys on blobs)

- Move update_handler from TABLE to handler
- Move out initialization of update handler from ha_write_row() to
  prepare_for_insert()
- Fixed that INSERT DELAYED works with update handler
- Added handler function to check if table has long unique hash indexes
- Disable write cache in MyISAM and Aria when using update_handler as
  if cache is used, the row will not be inserted until end of statement
  and update_handler would not find conflicting rows.
- Removed not used handler argument from
  check_duplicate_long_entries_update()
- Syntax cleanups
  - Indentation fixes
  - Don't use single character indentifiers for arguments
  • Loading branch information
montywi committed Jan 13, 2020
1 parent de80468 commit f6a18045f5e352b8efa17cc016a24dd3cd17253b
@@ -1477,4 +1477,15 @@ id select_type table type possible_keys key key_len ref rows Extra
SELECT t2.b FROM t1 JOIN t2 ON t1.d = t2.f WHERE t2.pk >= 20;
b
drop table t1,t2;
#
# Test insert delayed
#
create table t1(a blob unique) engine=myisam;
insert delayed into t1 values(1),(2),(3),(56),('sachin'),('maria'),(123456789034567891),(null),(null),(123456789034567890),('maria');
insert delayed into t1 values(1),(9);
flush tables t1;
select count(*) from t1;
count(*)
11
drop table t1;
set @@GLOBAL.max_allowed_packet= @allowed_packet;
@@ -556,4 +556,16 @@ SELECT t2.b FROM t1 JOIN t2 ON t1.d = t2.f WHERE t2.pk >= 20;
SELECT t2.b FROM t1 JOIN t2 ON t1.d = t2.f WHERE t2.pk >= 20;
drop table t1,t2;

--echo #
--echo # Test insert delayed
--echo #

create table t1(a blob unique) engine=myisam;
insert delayed into t1 values(1),(2),(3),(56),('sachin'),('maria'),(123456789034567891),(null),(null),(123456789034567890),('maria');
insert delayed into t1 values(1),(9);
flush tables t1;
select count(*) from t1;
drop table t1;

# Cleanup
set @@GLOBAL.max_allowed_packet= @allowed_packet;
@@ -2642,13 +2642,47 @@ handler *handler::clone(const char *name, MEM_ROOT *mem_root)
HA_OPEN_IGNORE_IF_LOCKED, mem_root))
goto err;

new_handler->update_handler= new_handler;
return new_handler;

err:
delete new_handler;
return NULL;
}

/**
@brief clone of current handler.
Creates a clone of handler used in update for
unique hash key.
*/
bool handler::clone_handler_for_update()
{
handler *tmp;
DBUG_ASSERT(table->s->long_unique_table);

if (update_handler != this)
return 0; // Already done
if (!(tmp= clone(table->s->normalized_path.str, table->in_use->mem_root)))
return 1;
update_handler= tmp;
update_handler->ha_external_lock(table->in_use, F_RDLCK);
return 0;
}

/**
@brief Deletes update handler object
*/
void handler::delete_update_handler()
{
if (update_handler != this)
{
update_handler->ha_external_lock(table->in_use, F_UNLCK);
update_handler->ha_close();
delete update_handler;
}
update_handler= this;
}

LEX_CSTRING *handler::engine_name()
{
return hton_name(ht);
@@ -2779,7 +2813,7 @@ int handler::ha_open(TABLE *table_arg, const char *name, int mode,
}
reset_statistics();
internal_tmp_table= MY_TEST(test_if_locked & HA_OPEN_INTERNAL_TABLE);

update_handler= this;
DBUG_RETURN(error);
}

@@ -6481,6 +6515,8 @@ int handler::ha_reset()
DBUG_ASSERT(inited == NONE);
/* reset the bitmaps to point to defaults */
table->default_column_bitmaps();
if (update_handler != this)
delete_update_handler();
pushed_cond= NULL;
tracker= NULL;
mark_trx_read_write_done= 0;
@@ -6515,33 +6551,39 @@ static int wsrep_after_row(THD *thd)
}
#endif /* WITH_WSREP */

static int check_duplicate_long_entry_key(TABLE *table, handler *h,

/**
Check if there is a conflicting unique hash key
*/

static int check_duplicate_long_entry_key(TABLE *table, handler *handler,
const uchar *new_rec, uint key_no)
{
Field *hash_field;
int result, error= 0;
KEY *key_info= table->key_info + key_no;
hash_field= key_info->key_part->field;
uchar ptr[HA_HASH_KEY_LENGTH_WITH_NULL];
DBUG_ENTER("check_duplicate_long_entry_key");

DBUG_ASSERT((key_info->flags & HA_NULL_PART_KEY &&
key_info->key_length == HA_HASH_KEY_LENGTH_WITH_NULL)
|| key_info->key_length == HA_HASH_KEY_LENGTH_WITHOUT_NULL);
key_info->key_length == HA_HASH_KEY_LENGTH_WITH_NULL) ||
key_info->key_length == HA_HASH_KEY_LENGTH_WITHOUT_NULL);

if (hash_field->is_real_null())
return 0;
DBUG_RETURN(0);

key_copy(ptr, new_rec, key_info, key_info->key_length, false);

if (!table->check_unique_buf)
table->check_unique_buf= (uchar *)alloc_root(&table->mem_root,
table->s->reclength);

result= h->ha_index_init(key_no, 0);
result= handler->ha_index_init(key_no, 0);
if (result)
return result;
DBUG_RETURN(result);
store_record(table, check_unique_buf);
result= h->ha_index_read_map(table->record[0],
result= handler->ha_index_read_map(table->record[0],
ptr, HA_WHOLE_KEY, HA_READ_KEY_EXACT);
if (!result)
{
@@ -6577,7 +6619,7 @@ static int check_duplicate_long_entry_key(TABLE *table, handler *h,
}
}
}
while (!is_same && !(result= h->ha_index_next_same(table->record[0],
while (!is_same && !(result= handler->ha_index_next_same(table->record[0],
ptr, key_info->key_length)));
if (is_same)
error= HA_ERR_FOUND_DUPP_KEY;
@@ -6589,36 +6631,38 @@ static int check_duplicate_long_entry_key(TABLE *table, handler *h,
if (error == HA_ERR_FOUND_DUPP_KEY)
{
table->file->errkey= key_no;
if (h->ha_table_flags() & HA_DUPLICATE_POS)
if (handler->ha_table_flags() & HA_DUPLICATE_POS)
{
h->position(table->record[0]);
memcpy(table->file->dup_ref, h->ref, h->ref_length);
handler->position(table->record[0]);
memcpy(table->file->dup_ref, handler->ref, handler->ref_length);
}
}
restore_record(table, check_unique_buf);
h->ha_index_end();
return error;
handler->ha_index_end();
DBUG_RETURN(error);
}

/** @brief
check whether inserted records breaks the
unique constraint on long columns.
@returns 0 if no duplicate else returns error
*/
static int check_duplicate_long_entries(TABLE *table, handler *h,

static int check_duplicate_long_entries(TABLE *table, handler *handler,
const uchar *new_rec)
{
table->file->errkey= -1;
int result;
for (uint i= 0; i < table->s->keys; i++)
{
int result;
if (table->key_info[i].algorithm == HA_KEY_ALG_LONG_HASH &&
(result= check_duplicate_long_entry_key(table, h, new_rec, i)))
(result= check_duplicate_long_entry_key(table, handler, new_rec, i)))
return result;
}
return 0;
}


/** @brief
check whether updated records breaks the
unique constraint on long columns.
@@ -6633,19 +6677,19 @@ static int check_duplicate_long_entries(TABLE *table, handler *h,
key as a parameter in normal insert key should be -1
@returns 0 if no duplicate else returns error
*/
static int check_duplicate_long_entries_update(TABLE *table, handler *h, uchar *new_rec)

static int check_duplicate_long_entries_update(TABLE *table, uchar *new_rec)
{
Field *field;
uint key_parts;
int error= 0;
KEY *keyinfo;
KEY_PART_INFO *keypart;
/*
Here we are comparing whether new record and old record are same
with respect to fields in hash_str
*/
uint reclength= (uint) (table->record[1] - table->record[0]);
table->clone_handler_for_update();
table->file->clone_handler_for_update();
for (uint i= 0; i < table->s->keys; i++)
{
keyinfo= table->key_info + i;
@@ -6655,13 +6699,15 @@ static int check_duplicate_long_entries_update(TABLE *table, handler *h, uchar *
keypart= keyinfo->key_part - key_parts;
for (uint j= 0; j < key_parts; j++, keypart++)
{
int error;
field= keypart->field;
/* Compare fields if they are different then check for duplicates*/
if(field->cmp_binary_offset(reclength))
/* Compare fields if they are different then check for duplicates */
if (field->cmp_binary_offset(reclength))
{
if((error= check_duplicate_long_entry_key(table, table->update_handler,
new_rec, i)))
goto exit;
if ((error= (check_duplicate_long_entry_key(table,
table->file->update_handler,
new_rec, i))))
return error;
/*
break because check_duplicate_long_entries_key will
take care of remaining fields
@@ -6671,10 +6717,30 @@ static int check_duplicate_long_entries_update(TABLE *table, handler *h, uchar *
}
}
}
exit:
return error;
return 0;
}


/*
Do all initialzation needed for insert
*/

int handler::prepare_for_insert()
{
/* Preparation for unique of blob's */
if (table->s->long_unique_table && inited == RND)
{
/*
When doing a scan we can't use the same handler to check
duplicate rows. Create a new temporary one
*/
if (clone_handler_for_update())
return 1;
}
return 0;
}


int handler::ha_write_row(const uchar *buf)
{
int error;
@@ -6690,10 +6756,8 @@ int handler::ha_write_row(const uchar *buf)

if (table->s->long_unique_table)
{
if (this->inited == RND)
table->clone_handler_for_update();
handler *h= table->update_handler ? table->update_handler : table->file;
if ((error= check_duplicate_long_entries(table, h, buf)))
DBUG_ASSERT(inited == NONE || update_handler != this);
if ((error= check_duplicate_long_entries(table, update_handler, buf)))
DBUG_RETURN(error);
}
TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_WRITE_ROW, MAX_KEY, 0,
@@ -6736,10 +6800,8 @@ int handler::ha_update_row(const uchar *old_data, const uchar *new_data)
mark_trx_read_write();
increment_statistics(&SSV::ha_update_count);
if (table->s->long_unique_table &&
(error= check_duplicate_long_entries_update(table, table->file, (uchar *)new_data)))
{
(error= check_duplicate_long_entries_update(table, (uchar*) new_data)))
return error;
}

TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_UPDATE_ROW, active_index, 0,
{ error= update_row(old_data, new_data);})
@@ -6977,6 +7039,11 @@ void handler::set_lock_type(enum thr_lock_type lock)
table->reginfo.lock_type= lock;
}

bool handler::has_long_unique()
{
return table->s->long_unique_table;
}

#ifdef WITH_WSREP
/**
@details
@@ -3058,6 +3058,7 @@ class handler :public Sql_alloc
/** Length of ref (1-8 or the clustered key length) */
uint ref_length;
FT_INFO *ft_handler;
handler *update_handler; /* Handler used in case of update */
enum init_stat { NONE=0, INDEX, RND };
init_stat inited, pre_inited;

@@ -3192,6 +3193,8 @@ class handler :public Sql_alloc
DBUG_ASSERT(inited == NONE);
}
virtual handler *clone(const char *name, MEM_ROOT *mem_root);
bool clone_handler_for_update();
void delete_update_handler();
/** This is called after create to allow us to set up cached variables */
void init()
{
@@ -4535,6 +4538,7 @@ class handler :public Sql_alloc

public:
bool check_table_binlog_row_based(bool binlog_row);
int prepare_for_insert();

inline void clear_cached_table_binlog_row_based_flag()
{
@@ -4878,6 +4882,8 @@ class handler :public Sql_alloc
{
return 0;
}
/* If the table is using sql level unique constraints on some column */
bool has_long_unique();

protected:
Handler_share *get_ha_share_ptr();
@@ -891,9 +891,6 @@ void close_thread_tables(THD *thd)

for (table= thd->open_tables; table; table= table->next)
{
if (table->update_handler)
table->delete_update_handler();

/* Table might be in use by some outer statement. */
DBUG_PRINT("tcache", ("table: '%s' query_id: %lu",
table->s->table_name.str, (ulong) table->query_id));
@@ -8736,8 +8733,8 @@ fill_record(THD *thd, TABLE *table, Field **ptr, List<Item> &values,

if (unlikely(field->invisible))
continue;
else
value=v++;

value=v++;

bool vers_sys_field= table->versioned() && field->vers_sys_field();

0 comments on commit f6a1804

Please sign in to comment.
You can’t perform that action at this time.