Skip to content

Commit

Permalink
Fixed hang in Aria page cache with concurrent SELECT
Browse files Browse the repository at this point in the history
MDEV-20302 Server hangs upon concurrent SELECT from partitioned S3
  • Loading branch information
sanja-byelkin authored and montywi committed Jun 14, 2020
1 parent b3179b7 commit d7a9cdc
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 34 deletions.
27 changes: 27 additions & 0 deletions mysql-test/suite/s3/alter2.result
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,30 @@ connection con1;
disconnect con1;
connection default;
DROP TABLE t1;
#
# MDEV-20302 Server hangs upon concurrent SELECT from partitioned S3
# table
#
CREATE TABLE t1 (
pk INT AUTO_INCREMENT,
c CHAR(12),
PRIMARY KEY(pk),
KEY(c)
) ENGINE=Aria
PARTITION BY KEY(pk) PARTITIONS 2;
CREATE VIEW v1 AS SELECT * FROM t1;
INSERT INTO t1 VALUES (NULL,'ill'),(NULL,'loop');
ALTER TABLE t1 ENGINE=S3;
connect con1,localhost,root,,test;
SELECT * FROM t1 WHERE c BETWEEN 'bar' AND 'foo';
connection default;
SELECT pk FROM v1;
pk
1
2
connection con1;
pk c
disconnect con1;
connection default;
DROP VIEW v1;
DROP TABLE t1;
33 changes: 33 additions & 0 deletions mysql-test/suite/s3/alter2.test
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
--source include/have_s3.inc
--source include/have_partition.inc
--source create_database.inc

--echo #
Expand Down Expand Up @@ -26,6 +27,38 @@ SELECT * FROM t1;
--connection default
DROP TABLE t1;

--echo #
--echo # MDEV-20302 Server hangs upon concurrent SELECT from partitioned S3
--echo # table
--echo #

CREATE TABLE t1 (
pk INT AUTO_INCREMENT,
c CHAR(12),
PRIMARY KEY(pk),
KEY(c)
) ENGINE=Aria
PARTITION BY KEY(pk) PARTITIONS 2;

CREATE VIEW v1 AS SELECT * FROM t1;
INSERT INTO t1 VALUES (NULL,'ill'),(NULL,'loop');
ALTER TABLE t1 ENGINE=S3;

--connect (con1,localhost,root,,test)
--send
SELECT * FROM t1 WHERE c BETWEEN 'bar' AND 'foo';

--connection default
SELECT pk FROM v1;

--connection con1
--reap

--disconnect con1
--connection default
DROP VIEW v1;
DROP TABLE t1;

#
# clean up
#
Expand Down
124 changes: 90 additions & 34 deletions storage/maria/ma_pagecache.c
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ my_bool my_disable_flush_pagecache_blocks= 0;
#define COND_FOR_REQUESTED 0 /* queue of thread waiting for read operation */
#define COND_FOR_SAVED 1 /* queue of thread waiting for flush */
#define COND_FOR_WRLOCK 2 /* queue of write lock */
#define COND_FOR_BIG_BLOCK 3 /* queue of waiting fo big block read */
#define COND_FOR_BIG_BLOCK 3 /* queue of waiting for big block read */
#define COND_SIZE 4 /* number of COND_* queues */

typedef mysql_cond_t KEYCACHE_CONDVAR;
Expand Down Expand Up @@ -178,7 +178,9 @@ struct st_pagecache_hash_link
#define PCBLOCK_CHANGED 32 /* block buffer contains a dirty page */
#define PCBLOCK_DIRECT_W 64 /* possible direct write to the block */
#define PCBLOCK_DEL_WRITE 128 /* should be written on delete */
#define PCBLOCK_BIG_READ 256 /* the first block of the big read in progress */
#define PCBLOCK_BIG_READ 256 /* the first block of the big read in progress
or not first block which other thread wait
to be read in big read operation */

/* page status, returned by find_block */
#define PAGE_READ 0
Expand Down Expand Up @@ -2770,7 +2772,7 @@ static my_bool make_lock_and_pin(PAGECACHE *pagecache,
*/

#ifdef WITH_S3_STORAGE_ENGINE
static my_bool read_big_block(PAGECACHE *pagecache,
static void read_big_block(PAGECACHE *pagecache,
PAGECACHE_BLOCK_LINK *block)
{
int page_st;
Expand Down Expand Up @@ -2809,25 +2811,26 @@ static my_bool read_big_block(PAGECACHE *pagecache,
if (block_to_read->status & PCBLOCK_ERROR)
{
/* We get first block with an error so all operation failed */
block->status|= PCBLOCK_ERROR;
block->error= block_to_read->error;
DBUG_RETURN(FALSE); // no retry
goto error;
}
// only primary request here, PAGE_WAIT_TO_BE_READ is impossible
DBUG_ASSERT(page_st != PAGE_WAIT_TO_BE_READ);
if (block_to_read->status & PCBLOCK_BIG_READ)
{
/*
Other thread is reading the big block so we will wait when it will
have read our block for us
*/
struct st_my_thread_var *thread;
DBUG_ASSERT(page_st == PAGE_WAIT_TO_BE_READ);
DBUG_ASSERT(page_st != PAGE_TO_BE_READ);
block->status|= PCBLOCK_BIG_READ; // will be read by other thread
/*
Block read failed because somebody else is reading the first block
(and all other blocks part of this one).
Wait until block is available.
*/
unreg_request(pagecache, block, 1);
thread= my_thread_var;
/* Put the request into a queue and wait until it can be processed */
wqueue_add_to_queue(&block->wqueue[COND_FOR_BIG_BLOCK], thread);
wqueue_add_to_queue(&block_to_read->wqueue[COND_FOR_BIG_BLOCK], thread);
do
{
DBUG_PRINT("wait",
Expand All @@ -2837,7 +2840,21 @@ static my_bool read_big_block(PAGECACHE *pagecache,
&pagecache->cache_lock);
}
while (thread->next);
DBUG_RETURN(TRUE);
// page shoud be read by other thread
DBUG_ASSERT(block->status & PCBLOCK_READ ||
block->status & PCBLOCK_ERROR);
DBUG_ASSERT(block->status & PCBLOCK_BIG_READ);
block->status&= ~PCBLOCK_BIG_READ;
// all is read => lets finish nice
DBUG_ASSERT(block_to_read != block);
remove_reader(block_to_read);
unreg_request(pagecache, block_to_read, 1);
DBUG_VOID_RETURN;
}
else
{
// only primary request here, PAGE_WAIT_TO_BE_READ is impossible
DBUG_ASSERT(page_st != PAGE_WAIT_TO_BE_READ);
}
}
else
Expand All @@ -2863,18 +2880,9 @@ static my_bool read_big_block(PAGECACHE *pagecache,
{
pagecache_pthread_mutex_lock(&pagecache->cache_lock);
block_to_read->status|= PCBLOCK_ERROR;
block->status|= PCBLOCK_ERROR;
block_to_read->error= block->error= (int16) my_errno;
block_to_read->error= (int16) my_errno;
pagecache->big_block_free(&data);
if (block_to_read != block)
{
remove_reader(block_to_read);
unreg_request(pagecache, block_to_read, 1);
}
/* Signal that all pending requests for this page now can be processed */
if (block->wqueue[COND_FOR_REQUESTED].last_thread)
wqueue_release_queue(&block->wqueue[COND_FOR_REQUESTED]);
DBUG_RETURN(FALSE); // no retry
goto error;
}

/*
Expand Down Expand Up @@ -2914,12 +2922,18 @@ static my_bool read_big_block(PAGECACHE *pagecache,
TRUE /*register*/, TRUE /*fast*/, &page_st);
if (!bl)
{
// we run out of easy avaliable pages in the cache
break;
/*
We can not get this page easy.
Maybe we will be lucky with other pages,
also among other pages can be page which wated by other thread
*/
continue;
}
DBUG_ASSERT(bl == bl->hash_link->block);
if ((bl->status & PCBLOCK_ERROR) == 0 &&
page_st == PAGE_TO_BE_READ)
(page_st == PAGE_TO_BE_READ || // page shoud be read
(page_st == PAGE_WAIT_TO_BE_READ &&
(bl->status & PCBLOCK_BIG_READ)))) // or page waited by other thread
{
memcpy(bl->buffer, data.str + offset, pagecache->block_size);
bl->status|= PCBLOCK_READ;
Expand All @@ -2940,17 +2954,65 @@ static my_bool read_big_block(PAGECACHE *pagecache,
pagecache->big_block_free(&data);

block_to_read->status&= ~PCBLOCK_BIG_READ;

end:
if (block_to_read != block)
{
remove_reader(block_to_read);
unreg_request(pagecache, block_to_read, 1);
}
if (block->wqueue[COND_FOR_BIG_BLOCK].last_thread)
wqueue_release_queue(&block->wqueue[COND_FOR_BIG_BLOCK]);
/* Signal that all pending requests for this page now can be processed */
if (block->wqueue[COND_FOR_REQUESTED].last_thread)
wqueue_release_queue(&block->wqueue[COND_FOR_REQUESTED]);
DBUG_VOID_RETURN;

DBUG_RETURN(FALSE);
error:
/*
Read failed. Mark all readers waiting for the a block covered by the
big block that the read failed
*/
for (offset= pagecache->block_size, page= page_to_read + 1;
offset < data.length;
offset+= pagecache->block_size, page++)
{
DBUG_ASSERT(offset + pagecache->block_size <= data.length);
if (page == our_page)
{
DBUG_ASSERT(!(block->status & PCBLOCK_READ));
block->status|= PCBLOCK_ERROR;
block->error= (int16) my_errno;
}
else
{
PAGECACHE_BLOCK_LINK *bl;
bl= find_block(pagecache, &block->hash_link->file, page, 1,
FALSE, TRUE /* copy under protection (?)*/,
TRUE /*register*/, TRUE /*fast*/, &page_st);
if (!bl)
{
/*
We can not get this page easy.
Maybe we will be lucky with other pages,
also among other pages can be page which wated by other thread
*/
continue;
}
DBUG_ASSERT(bl == bl->hash_link->block);
if ((bl->status & PCBLOCK_ERROR) == 0 &&
(page_st == PAGE_TO_BE_READ || // page shoud be read
(page_st == PAGE_WAIT_TO_BE_READ &&
(bl->status & PCBLOCK_BIG_READ)))) // or page waited by other thread
{
bl->status|= PCBLOCK_ERROR;
bl->error= (int16) my_errno;
}
remove_reader(bl);
unreg_request(pagecache, bl, 1);
}
}
goto end;
}
#endif /* WITH_S3_STORAGE_ENGINE */

Expand Down Expand Up @@ -3706,14 +3768,8 @@ uchar *pagecache_read(PAGECACHE *pagecache,
/* It is big read and this thread should read */
DBUG_ASSERT(page_st == PAGE_TO_BE_READ);

if (read_big_block(pagecache, block))
{
/* block is unregistered in read_big_block */
pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
dec_counter_for_resize_op(pagecache);
DBUG_PRINT("restart", ("big block fail, restarting..."));
goto restart;
}
read_big_block(pagecache, block);

if (!((new_pin == PAGECACHE_PIN_LEFT_UNPINNED) ||
(new_pin == PAGECACHE_PIN)))
{
Expand Down

0 comments on commit d7a9cdc

Please sign in to comment.