Skip to content

Commit

Permalink
MDEV-18973 CLIENT_FOUND_ROWS wrong in spider
Browse files Browse the repository at this point in the history
Get count from last_used_con->info
Contributed by willhan at Tencent Games
Reviewed by Monty
  • Loading branch information
Kentoku committed Nov 29, 2019
2 parents 7955e19 + e066723 commit a3b63b8
Show file tree
Hide file tree
Showing 27 changed files with 846 additions and 43 deletions.
51 changes: 48 additions & 3 deletions sql/ha_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4822,6 +4822,42 @@ ha_rows ha_partition::guess_bulk_insert_rows()
}


void ha_partition::sum_copy_info(handler *file)
{
copy_info.records+= file->copy_info.records;
copy_info.touched+= file->copy_info.touched;
copy_info.copied+= file->copy_info.copied;
copy_info.deleted+= file->copy_info.deleted;
copy_info.updated+= file->copy_info.updated;
}


void ha_partition::sum_copy_infos()
{
handler **file_array;
bzero(&copy_info, sizeof(copy_info));
file_array= m_file;
do
{
if (bitmap_is_set(&(m_opened_partitions), (uint)(file_array - m_file)))
sum_copy_info(*file_array);
} while (*(++file_array));
}

void ha_partition::reset_copy_info()
{
handler **file_array;
bzero(&copy_info, sizeof(copy_info));
file_array= m_file;
do
{
if (bitmap_is_set(&(m_opened_partitions), (uint)(file_array - m_file)))
bzero(&(*file_array)->copy_info, sizeof(copy_info));
} while (*(++file_array));
}



/*
Finish a large batch of insert rows
Expand Down Expand Up @@ -4853,6 +4889,7 @@ int ha_partition::end_bulk_insert()
int tmp;
if ((tmp= m_file[i]->ha_end_bulk_insert()))
error= tmp;
sum_copy_info(m_file[i]);
}
bitmap_clear_all(&m_bulk_insert_started);
DBUG_RETURN(error);
Expand Down Expand Up @@ -11164,6 +11201,7 @@ bool ha_partition::start_bulk_update()

do
{
bzero(&(*file)->copy_info, sizeof((*file)->copy_info));
if ((*file)->start_bulk_update())
DBUG_RETURN(TRUE);
} while (*(++file));
Expand Down Expand Up @@ -11221,6 +11259,7 @@ int ha_partition::end_bulk_update()
if ((tmp= (*file)->end_bulk_update()))
error= tmp;
} while (*(++file));
sum_copy_infos();
DBUG_RETURN(error);
}

Expand Down Expand Up @@ -11317,6 +11356,7 @@ int ha_partition::end_bulk_delete()
if ((tmp= (*file)->end_bulk_delete()))
error= tmp;
} while (*(++file));
sum_copy_infos();
DBUG_RETURN(error);
}

Expand Down Expand Up @@ -11433,11 +11473,13 @@ int ha_partition::pre_direct_update_rows_init(List<Item> *update_fields)
0 Success
*/

int ha_partition::direct_update_rows(ha_rows *update_rows_result)
int ha_partition::direct_update_rows(ha_rows *update_rows_result,
ha_rows *found_rows_result)
{
int error;
bool rnd_seq= FALSE;
ha_rows update_rows= 0;
ha_rows found_rows= 0;
uint32 i;
DBUG_ENTER("ha_partition::direct_update_rows");

Expand All @@ -11449,6 +11491,7 @@ int ha_partition::direct_update_rows(ha_rows *update_rows_result)
}

*update_rows_result= 0;
*found_rows_result= 0;
for (i= m_part_spec.start_part; i <= m_part_spec.end_part; i++)
{
handler *file= m_file[i];
Expand All @@ -11464,7 +11507,8 @@ int ha_partition::direct_update_rows(ha_rows *update_rows_result)
}
if (unlikely((error= (m_pre_calling ?
(file)->pre_direct_update_rows() :
(file)->ha_direct_update_rows(&update_rows)))))
(file)->ha_direct_update_rows(&update_rows,
&found_rows)))))
{
if (rnd_seq)
{
Expand All @@ -11476,6 +11520,7 @@ int ha_partition::direct_update_rows(ha_rows *update_rows_result)
DBUG_RETURN(error);
}
*update_rows_result+= update_rows;
*found_rows_result+= found_rows;
}
if (rnd_seq)
{
Expand Down Expand Up @@ -11511,7 +11556,7 @@ int ha_partition::pre_direct_update_rows()
DBUG_ENTER("ha_partition::pre_direct_update_rows");
save_m_pre_calling= m_pre_calling;
m_pre_calling= TRUE;
error= direct_update_rows(&not_used);
error= direct_update_rows(&not_used, &not_used);
m_pre_calling= save_m_pre_calling;
DBUG_RETURN(error);
}
Expand Down
5 changes: 4 additions & 1 deletion sql/ha_partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,9 @@ class ha_partition :public handler
part_share->next_auto_inc_val= part_share->prev_auto_inc_val;
handler::restore_auto_increment();
}
void sum_copy_info(handler *file);
void sum_copy_infos();
void reset_copy_info();
/** Temporary storage for new partitions Handler_shares during ALTER */
List<Parts_share_refs> m_new_partitions_share_refs;
/** Sorted array of partition ids in descending order of number of rows. */
Expand Down Expand Up @@ -657,7 +660,7 @@ class ha_partition :public handler
virtual int update_row(const uchar * old_data, const uchar * new_data);
virtual int direct_update_rows_init(List<Item> *update_fields);
virtual int pre_direct_update_rows_init(List<Item> *update_fields);
virtual int direct_update_rows(ha_rows *update_rows);
virtual int direct_update_rows(ha_rows *update_rows, ha_rows *found_rows);
virtual int pre_direct_update_rows();
virtual bool start_bulk_delete();
virtual int end_bulk_delete();
Expand Down
4 changes: 2 additions & 2 deletions sql/handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6812,14 +6812,14 @@ int handler::ha_delete_row(const uchar *buf)
@retval != 0 Failure.
*/

int handler::ha_direct_update_rows(ha_rows *update_rows)
int handler::ha_direct_update_rows(ha_rows *update_rows, ha_rows *found_rows)
{
int error;

MYSQL_UPDATE_ROW_START(table_share->db.str, table_share->table_name.str);
mark_trx_read_write();

error = direct_update_rows(update_rows);
error = direct_update_rows(update_rows, found_rows);
MYSQL_UPDATE_ROW_DONE(error);
return error;
}
Expand Down
24 changes: 22 additions & 2 deletions sql/handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -927,6 +927,16 @@ enum tablespace_access_mode
TS_NOT_ACCESSIBLE = 2
};

/* Statistics about batch operations like bulk_insert */
struct ha_copy_info
{
ha_rows records; /* Used to check if rest of variables can be used */
ha_rows touched;
ha_rows copied;
ha_rows deleted;
ha_rows updated;
};

struct handlerton;
class st_alter_tablespace : public Sql_alloc
{
Expand Down Expand Up @@ -3061,6 +3071,7 @@ class handler :public Sql_alloc
ulonglong rows_changed;
/* One bigger than needed to avoid to test if key == MAX_KEY */
ulonglong index_rows_read[MAX_KEY+1];
ha_copy_info copy_info;

private:
/* ANALYZE time tracker, if present */
Expand Down Expand Up @@ -3276,6 +3287,7 @@ class handler :public Sql_alloc
{
DBUG_ENTER("handler::ha_start_bulk_insert");
estimation_rows_to_insert= rows;
bzero(&copy_info,sizeof(copy_info));
start_bulk_insert(rows, flags);
DBUG_VOID_RETURN;
}
Expand Down Expand Up @@ -3347,6 +3359,13 @@ class handler :public Sql_alloc
{
rows_read= rows_changed= rows_tmp_read= 0;
bzero(index_rows_read, sizeof(index_rows_read));
bzero(&copy_info, sizeof(copy_info));
}
virtual void reset_copy_info() {}
void ha_reset_copy_info()
{
bzero(&copy_info, sizeof(copy_info));
reset_copy_info();
}
virtual void change_table_ptr(TABLE *table_arg, TABLE_SHARE *share)
{
Expand Down Expand Up @@ -4599,7 +4618,7 @@ class handler :public Sql_alloc

/* Perform initialization for a direct update request */
public:
int ha_direct_update_rows(ha_rows *update_rows);
int ha_direct_update_rows(ha_rows *update_rows, ha_rows *found_rows);
virtual int direct_update_rows_init(List<Item> *update_fields)
{
return HA_ERR_WRONG_COMMAND;
Expand All @@ -4609,7 +4628,8 @@ class handler :public Sql_alloc
{
return HA_ERR_WRONG_COMMAND;
}
virtual int direct_update_rows(ha_rows *update_rows __attribute__((unused)))
virtual int direct_update_rows(ha_rows *update_rows __attribute__((unused)),
ha_rows *found_rows __attribute__((unused)))
{
return HA_ERR_WRONG_COMMAND;
}
Expand Down
30 changes: 24 additions & 6 deletions sql/sql_insert.cc
Original file line number Diff line number Diff line change
Expand Up @@ -904,6 +904,8 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
using_bulk_insert= 1;
table->file->ha_start_bulk_insert(values_list.elements);
}
else
table->file->ha_reset_copy_info();
}

thd->abort_on_warning= !ignore && thd->is_strict_mode();
Expand Down Expand Up @@ -1107,11 +1109,23 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
auto_inc values from the delayed_insert thread as they share TABLE.
*/
table->file->ha_release_auto_increment();
if (using_bulk_insert && unlikely(table->file->ha_end_bulk_insert()) &&
!error)
if (using_bulk_insert)
{
if (unlikely(table->file->ha_end_bulk_insert()) &&
!error)
{
table->file->print_error(my_errno,MYF(0));
error=1;
}
}
/* Get better status from handler if handler supports it */
if (table->file->copy_info.records)
{
table->file->print_error(my_errno,MYF(0));
error=1;
DBUG_ASSERT(info.copied >= table->file->copy_info.copied);
info.touched= table->file->copy_info.touched;
info.copied= table->file->copy_info.copied;
info.deleted= table->file->copy_info.deleted;
info.updated= table->file->copy_info.updated;
}
if (duplic != DUP_ERROR || ignore)
{
Expand Down Expand Up @@ -1234,8 +1248,12 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
retval= thd->lex->explain->send_explain(thd);
goto abort;
}
if ((iteration * values_list.elements) == 1 && (!(thd->variables.option_bits & OPTION_WARNINGS) ||
!thd->cuted_fields))
DBUG_PRINT("info", ("touched: %llu copied: %llu updated: %llu deleted: %llu",
(ulonglong) info.touched, (ulonglong) info.copied,
(ulonglong) info.updated, (ulonglong) info.deleted));

if ((iteration * values_list.elements) == 1 &&
(!(thd->variables.option_bits & OPTION_WARNINGS) || !thd->cuted_fields))
{
my_ok(thd, info.copied + info.deleted +
((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
Expand Down
14 changes: 12 additions & 2 deletions sql/sql_update.cc
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,11 @@ int mysql_update(THD *thd,

Later we also ensure that we are only using one table (no sub queries)
*/
DBUG_PRINT("info", ("HA_CAN_DIRECT_UPDATE_AND_DELETE: %s", (table->file->ha_table_flags() & HA_CAN_DIRECT_UPDATE_AND_DELETE) ? "TRUE" : "FALSE"));
DBUG_PRINT("info", ("using_io_buffer: %s", query_plan.using_io_buffer ? "TRUE" : "FALSE"));
DBUG_PRINT("info", ("ignore: %s", ignore ? "TRUE" : "FALSE"));
DBUG_PRINT("info", ("virtual_columns_marked_for_read: %s", table->check_virtual_columns_marked_for_read() ? "TRUE" : "FALSE"));
DBUG_PRINT("info", ("virtual_columns_marked_for_write: %s", table->check_virtual_columns_marked_for_write() ? "TRUE" : "FALSE"));
if ((table->file->ha_table_flags() & HA_CAN_DIRECT_UPDATE_AND_DELETE) &&
!has_triggers && !binlog_is_row &&
!query_plan.using_io_buffer && !ignore &&
Expand Down Expand Up @@ -928,11 +933,16 @@ int mysql_update(THD *thd,
if (do_direct_update)
{
/* Direct updating is supported */
ha_rows update_rows= 0, found_rows= 0;
DBUG_PRINT("info", ("Using direct update"));
table->reset_default_fields();
if (unlikely(!(error= table->file->ha_direct_update_rows(&updated))))
if (unlikely(!(error= table->file->ha_direct_update_rows(&update_rows,
&found_rows))))
error= -1;
found= updated;
updated= update_rows;
found= found_rows;
if (found < updated)
found= updated;
goto update_end;
}

Expand Down
Loading

0 comments on commit a3b63b8

Please sign in to comment.