Skip to content

Commit

Permalink
MW-309 Fix wsrep_max_ws_rows so that it does not affect queries
Browse files Browse the repository at this point in the history
Option wsrep_max_ws_rows is intended to limit the maximum number of rows
in a writeset. To enforce this limit, we increment THD::wsrep_affected_rows
on every INSERT, UPDATE or DELETE. The problem is that we do so even on
insertion to internal temporary tables used for SELECTs and such.
THD::wsrep_affected_rows is now incremented only for rows that are actually
replicated.

Signed-off-by: Sachin Setiya <sachinsetia1001@gmail.com>
  • Loading branch information
SachinSetiya committed Mar 13, 2017
1 parent 4573924 commit 9be994b
Showing 1 changed file with 19 additions and 50 deletions.
69 changes: 19 additions & 50 deletions sql/handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5933,15 +5933,26 @@ static int binlog_log_row(TABLE* table,
THD *const thd= table->in_use;

#ifdef WITH_WSREP
/* only InnoDB tables will be replicated through binlog emulation */
if (WSREP_EMULATE_BINLOG(thd) &&
table->file->ht->db_type != DB_TYPE_INNODB &&
!(table->file->ht->db_type == DB_TYPE_PARTITION_DB &&
(((ha_partition*)(table->file))->wsrep_db_type() == DB_TYPE_INNODB)))
// !strcmp(table->file->table_type(), "InnoDB"))
if (WSREP_EMULATE_BINLOG(thd))
{
return 0;
}
/* only InnoDB tables will be replicated through binlog emulation */
if (table->file->ht->db_type != DB_TYPE_INNODB &&
!(table->file->ht->db_type == DB_TYPE_PARTITION_DB &&
(((ha_partition*)(table->file))->wsrep_db_type() == DB_TYPE_INNODB)))
{
return 0;
}

thd->wsrep_affected_rows++;
if (wsrep_max_ws_rows &&
thd->wsrep_exec_mode != REPL_RECV &&
thd->wsrep_affected_rows > wsrep_max_ws_rows)
{
trans_rollback_stmt(thd) || trans_rollback(thd);
my_message(ER_ERROR_DURING_COMMIT, "wsrep_max_ws_rows exceeded", MYF(0));
return ER_ERROR_DURING_COMMIT;
}
}
#endif /* WITH_WSREP */
if (check_table_binlog_row_based(thd, table))
{
Expand Down Expand Up @@ -6108,20 +6119,6 @@ int handler::ha_write_row(uchar *buf)
rows_changed++;
if (unlikely(error= binlog_log_row(table, 0, buf, log_func)))
DBUG_RETURN(error); /* purecov: inspected */
#ifdef WITH_WSREP
if (WSREP(current_thd))
{
current_thd->wsrep_affected_rows++;
if (wsrep_max_ws_rows &&
current_thd->wsrep_exec_mode != REPL_RECV &&
current_thd->wsrep_affected_rows > wsrep_max_ws_rows)
{
trans_rollback_stmt(current_thd) || trans_rollback(current_thd);
my_message(ER_ERROR_DURING_COMMIT, "wsrep_max_ws_rows exceeded", MYF(0));
DBUG_RETURN(ER_ERROR_DURING_COMMIT);
}
}
#endif /* WITH_WSREP */

DEBUG_SYNC_C("ha_write_row_end");
DBUG_RETURN(0);
Expand Down Expand Up @@ -6155,20 +6152,6 @@ int handler::ha_update_row(const uchar *old_data, uchar *new_data)
rows_changed++;
if (unlikely(error= binlog_log_row(table, old_data, new_data, log_func)))
return error;
#ifdef WITH_WSREP
if (WSREP(current_thd))
{
current_thd->wsrep_affected_rows++;
if (wsrep_max_ws_rows &&
current_thd->wsrep_exec_mode != REPL_RECV &&
current_thd->wsrep_affected_rows > wsrep_max_ws_rows)
{
trans_rollback_stmt(current_thd) || trans_rollback(current_thd);
my_message(ER_ERROR_DURING_COMMIT, "wsrep_max_ws_rows exceeded", MYF(0));
return ER_ERROR_DURING_COMMIT;
}
}
#endif /* WITH_WSREP */
return 0;
}

Expand Down Expand Up @@ -6196,20 +6179,6 @@ int handler::ha_delete_row(const uchar *buf)
rows_changed++;
if (unlikely(error= binlog_log_row(table, buf, 0, log_func)))
return error;
#ifdef WITH_WSREP
if (WSREP(current_thd))
{
current_thd->wsrep_affected_rows++;
if (wsrep_max_ws_rows &&
current_thd->wsrep_exec_mode != REPL_RECV &&
current_thd->wsrep_affected_rows > wsrep_max_ws_rows)
{
trans_rollback_stmt(current_thd) || trans_rollback(current_thd);
my_message(ER_ERROR_DURING_COMMIT, "wsrep_max_ws_rows exceeded", MYF(0));
return ER_ERROR_DURING_COMMIT;
}
}
#endif /* WITH_WSREP */
return 0;
}

Expand Down

0 comments on commit 9be994b

Please sign in to comment.