Skip to content

Commit

Permalink
MDEV-16584 SP with a cursor inside a loop wastes THD memory aggressively
Browse files Browse the repository at this point in the history
Problem:

push_cursor() created sp_cursor instances on THD::main_mem_root,
which is freed only after the SP instructions loop.

Changes:
- Moving sp_cursor declaration from sp_rcontext.h to sql_class.h
- Deriving sp_instr_cpush from sp_cursor. So now sp_cursor is created
  only once (at the SP parse time) and then reused on all loop iterations
- Adding a new method reset() into sp_cursor (and its parent classes)
  to reset an sp_cursor instance before reuse.
- Moving former sp_cursor members m_fetch_count, m_row_count, m_found
  into a separate class sp_cursor_statistics. This helps to reuse
  the code in sp_cursor constructors, and in sp_cursor::reset()
- Adding a helper method sp_rcontext::pop_cursor().
- Adding "THD*" parameter to so_rcontext::pop_cursors() and pop_all_cursors()
- Removing "new" and "delete" from sp_rcontext::push_cursor() and
  sp_rconext::pop_cursor().
- Fixing sp_cursor not to derive from Sql_alloc, as it's now allocated
  only as a part of sp_instr_cpush (and not allocated separately).
- Moving lex_keeper->disable_query_cache() from sp_cursor::sp_cursor()
  to sp_instr_cpush::execute().
- Adding tests
  • Loading branch information
abarkov committed Jun 27, 2018
1 parent 1d6bc0f commit 56145be
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 116 deletions.
29 changes: 29 additions & 0 deletions mysql-test/main/sp-cursor.result
Original file line number Diff line number Diff line change
Expand Up @@ -684,3 +684,32 @@ rec.a
1
rec.a
1
#
# MDEV-16584 SP with a cursor inside a loop wastes THD memory aggressively
#
CREATE PROCEDURE p1()
BEGIN
DECLARE mem_used_old BIGINT UNSIGNED DEFAULT
(SELECT VARIABLE_VALUE FROM INFORMATION_SCHEMA.SESSION_STATUS
WHERE VARIABLE_NAME='MEMORY_USED');
DECLARE i INT DEFAULT 1;
WHILE i <= 5000
DO
BEGIN
DECLARE msg TEXT;
DECLARE mem_used_cur BIGINT UNSIGNED DEFAULT
(SELECT VARIABLE_VALUE FROM INFORMATION_SCHEMA.SESSION_STATUS
WHERE VARIABLE_NAME='MEMORY_USED');
DECLARE cur CURSOR FOR SELECT 1 FROM DUAL;
IF (mem_used_cur >= mem_used_old * 2) THEN
SHOW STATUS LIKE 'Memory_used';
SET msg=CONCAT('Memory leak detected: i=', i, ' mem_used_old=',mem_used_old,' mem_used_cur=', mem_used_cur);
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT=msg;
END IF;
END;
SET i=i+1;
END WHILE;
END;
$$
CALL p1;
DROP PROCEDURE p1;
34 changes: 34 additions & 0 deletions mysql-test/main/sp-cursor.test
Original file line number Diff line number Diff line change
Expand Up @@ -689,3 +689,37 @@ label2:
END;
$$
DELIMITER ;$$


--echo #
--echo # MDEV-16584 SP with a cursor inside a loop wastes THD memory aggressively
--echo #

DELIMITER $$;
CREATE PROCEDURE p1()
BEGIN
DECLARE mem_used_old BIGINT UNSIGNED DEFAULT
(SELECT VARIABLE_VALUE FROM INFORMATION_SCHEMA.SESSION_STATUS
WHERE VARIABLE_NAME='MEMORY_USED');
DECLARE i INT DEFAULT 1;
WHILE i <= 5000
DO
BEGIN
DECLARE msg TEXT;
DECLARE mem_used_cur BIGINT UNSIGNED DEFAULT
(SELECT VARIABLE_VALUE FROM INFORMATION_SCHEMA.SESSION_STATUS
WHERE VARIABLE_NAME='MEMORY_USED');
DECLARE cur CURSOR FOR SELECT 1 FROM DUAL;
IF (mem_used_cur >= mem_used_old * 2) THEN
SHOW STATUS LIKE 'Memory_used';
SET msg=CONCAT('Memory leak detected: i=', i, ' mem_used_old=',mem_used_old,' mem_used_cur=', mem_used_cur);
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT=msg;
END IF;
END;
SET i=i+1;
END WHILE;
END;
$$
DELIMITER ;$$
CALL p1;
DROP PROCEDURE p1;
10 changes: 6 additions & 4 deletions sql/sp_head.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1406,7 +1406,7 @@ sp_head::execute(THD *thd, bool merge_da_on_success)
/* Only pop cursors when we're done with group aggregate running. */
if (m_chistics.agg_type != GROUP_AGGREGATE ||
(m_chistics.agg_type == GROUP_AGGREGATE && thd->spcont->quit_func))
thd->spcont->pop_all_cursors(); // To avoid memory leaks after an error
thd->spcont->pop_all_cursors(thd); // To avoid memory leaks after an error

/* Restore all saved */
if (m_chistics.agg_type == GROUP_AGGREGATE)
Expand Down Expand Up @@ -4186,11 +4186,13 @@ sp_instr_cpush::execute(THD *thd, uint *nextp)
{
DBUG_ENTER("sp_instr_cpush::execute");

int ret= thd->spcont->push_cursor(thd, &m_lex_keeper);
sp_cursor::reset(thd, &m_lex_keeper);
m_lex_keeper.disable_query_cache();
thd->spcont->push_cursor(this);

*nextp= m_ip+1;

DBUG_RETURN(ret);
DBUG_RETURN(false);
}


Expand Down Expand Up @@ -4224,7 +4226,7 @@ int
sp_instr_cpop::execute(THD *thd, uint *nextp)
{
DBUG_ENTER("sp_instr_cpop::execute");
thd->spcont->pop_cursors(m_count);
thd->spcont->pop_cursors(thd, m_count);
*nextp= m_ip+1;
DBUG_RETURN(0);
}
Expand Down
3 changes: 2 additions & 1 deletion sql/sp_head.h
Original file line number Diff line number Diff line change
Expand Up @@ -1757,7 +1757,8 @@ class sp_instr_hreturn : public sp_instr_jump


/** This is DECLARE CURSOR */
class sp_instr_cpush : public sp_instr
class sp_instr_cpush : public sp_instr,
public sp_cursor
{
sp_instr_cpush(const sp_instr_cpush &); /**< Prevent use of these */
void operator=(sp_instr_cpush &);
Expand Down
43 changes: 12 additions & 31 deletions sql/sp_rcontext.cc
Original file line number Diff line number Diff line change
Expand Up @@ -425,28 +425,26 @@ bool sp_rcontext::set_return_value(THD *thd, Item **return_value_item)
}


bool sp_rcontext::push_cursor(THD *thd, sp_lex_keeper *lex_keeper)
void sp_rcontext::push_cursor(sp_cursor *c)
{
/*
We should create cursors in the callers arena, as
it could be (and usually is) used in several instructions.
*/
sp_cursor *c= new (callers_arena->mem_root) sp_cursor(thd, lex_keeper);
m_cstack[m_ccount++]= c;
}

if (c == NULL)
return true;

m_cstack[m_ccount++]= c;
return false;
void sp_rcontext::pop_cursor(THD *thd)
{
DBUG_ASSERT(m_ccount > 0);
if (m_cstack[m_ccount - 1]->is_open())
m_cstack[m_ccount - 1]->close(thd);
m_ccount--;
}


void sp_rcontext::pop_cursors(size_t count)
void sp_rcontext::pop_cursors(THD *thd, size_t count)
{
DBUG_ASSERT(m_ccount >= count);

while (count--)
delete m_cstack[--m_ccount];
pop_cursor(thd);
}


Expand Down Expand Up @@ -733,22 +731,6 @@ bool sp_rcontext::set_case_expr(THD *thd, int case_expr_id,
///////////////////////////////////////////////////////////////////////////


sp_cursor::sp_cursor(THD *thd_arg, sp_lex_keeper *lex_keeper):
result(thd_arg),
m_lex_keeper(lex_keeper),
server_side_cursor(NULL),
m_fetch_count(0),
m_row_count(0),
m_found(false)
{
/*
currsor can't be stored in QC, so we should prevent opening QC for
try to write results which are absent.
*/
lex_keeper->disable_query_cache();
}


/*
Open an SP cursor
Expand Down Expand Up @@ -811,8 +793,7 @@ int sp_cursor::close(THD *thd)
MYF(0));
return -1;
}
m_row_count= m_fetch_count= 0;
m_found= false;
sp_cursor_statistics::reset();
destroy();
return 0;
}
Expand Down
87 changes: 7 additions & 80 deletions sql/sp_rcontext.h
Original file line number Diff line number Diff line change
Expand Up @@ -293,23 +293,20 @@ class sp_rcontext : public Sql_alloc
// Cursors.
/////////////////////////////////////////////////////////////////////////

/// Create a new sp_cursor instance and push it to the cursor stack.
/// Push a cursor to the cursor stack.
///
/// @param lex_keeper SP-instruction execution helper.
/// @param i Cursor-push instruction.
/// @param cursor The cursor
///
/// @return error flag.
/// @retval false on success.
/// @retval true on error.
bool push_cursor(THD *thd, sp_lex_keeper *lex_keeper);
void push_cursor(sp_cursor *cur);

void pop_cursor(THD *thd);
/// Pop and delete given number of sp_cursor instance from the cursor stack.
///
/// @param count Number of cursors to pop & delete.
void pop_cursors(size_t count);
void pop_cursors(THD *thd, size_t count);

void pop_all_cursors()
{ pop_cursors(m_ccount); }
void pop_all_cursors(THD *thd)
{ pop_cursors(thd, m_ccount); }

sp_cursor *get_cursor(uint i) const
{ return m_cstack[i]; }
Expand Down Expand Up @@ -429,74 +426,4 @@ class sp_rcontext : public Sql_alloc
Bounds_checked_array<Item_cache *> m_case_expr_holders;
}; // class sp_rcontext : public Sql_alloc

///////////////////////////////////////////////////////////////////////////
// sp_cursor declaration.
///////////////////////////////////////////////////////////////////////////

class Server_side_cursor;
typedef class st_select_lex_unit SELECT_LEX_UNIT;

/* A mediator between stored procedures and server side cursors */

class sp_cursor : public Sql_alloc
{
private:
/// An interceptor of cursor result set used to implement
/// FETCH <cname> INTO <varlist>.
class Select_fetch_into_spvars: public select_result_interceptor
{
List<sp_variable> *spvar_list;
uint field_count;
bool send_data_to_variable_list(List<sp_variable> &vars, List<Item> &items);
public:
Select_fetch_into_spvars(THD *thd_arg): select_result_interceptor(thd_arg) {}
uint get_field_count() { return field_count; }
void set_spvar_list(List<sp_variable> *vars) { spvar_list= vars; }

virtual bool send_eof() { return FALSE; }
virtual int send_data(List<Item> &items);
virtual int prepare(List<Item> &list, SELECT_LEX_UNIT *u);
};

public:
sp_cursor(THD *thd_arg, sp_lex_keeper *lex_keeper);

virtual ~sp_cursor()
{ destroy(); }

sp_lex_keeper *get_lex_keeper() { return m_lex_keeper; }

int open(THD *thd);

int open_view_structure_only(THD *thd);

int close(THD *thd);

my_bool is_open()
{ return MY_TEST(server_side_cursor); }

bool found() const
{ return m_found; }

ulonglong row_count() const
{ return m_row_count; }

ulonglong fetch_count() const
{ return m_fetch_count; }

int fetch(THD *, List<sp_variable> *vars, bool error_on_no_data);

bool export_structure(THD *thd, Row_definition_list *list);

private:
Select_fetch_into_spvars result;
sp_lex_keeper *m_lex_keeper;
Server_side_cursor *server_side_cursor;
ulonglong m_fetch_count; // Number of FETCH commands since last OPEN
ulonglong m_row_count; // Number of successful FETCH since last OPEN
bool m_found; // If last FETCH fetched a row
void destroy();

}; // class sp_cursor : public Sql_alloc

#endif /* _SP_RCONTEXT_H_ */
Loading

0 comments on commit 56145be

Please sign in to comment.