Skip to content

Commit

Permalink
BUG#30301356 - SOME EVENTS ARE DELAYED AFTER DROPPING EVENT
Browse files Browse the repository at this point in the history
queues.c cleanup and refactoring.

Restore old version of _downhead() (from before cd483c5)
that works well in an average case. Use it for queue_fix().

Move existing specialized version of _downhead() to queue_replace()
where it'll be handling the case it was specifically optimized for
(moving the element to the end of the queue).
And correct it to fix the heap not only down, but also up
(this fixes BUG#30301356).

Add unit tests.

Collateral cosmetic fixes.
  • Loading branch information
vuvova committed Apr 30, 2020
1 parent 69bd731 commit 6a31aea
Show file tree
Hide file tree
Showing 4 changed files with 234 additions and 85 deletions.
10 changes: 5 additions & 5 deletions include/queues.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,31 +54,31 @@ typedef struct st_queue {
#define queue_top(queue) ((queue)->root[1])
#define queue_element(queue,index) ((queue)->root[index])
#define queue_end(queue) ((queue)->root[(queue)->elements])
#define queue_replace_top(queue) _downheap(queue, 1, (queue)->root[1])
#define queue_replace_top(queue) _downheap(queue, 1)
#define queue_set_cmp_arg(queue, set_arg) (queue)->first_cmp_arg= set_arg
#define queue_set_max_at_top(queue, set_arg) \
(queue)->max_at_top= set_arg ? -1 : 1
#define queue_remove_top(queue_arg) queue_remove((queue_arg), queue_first_element(queue_arg))
typedef int (*queue_compare)(void *,uchar *, uchar *);

int init_queue(QUEUE *queue,uint max_elements,uint offset_to_key,
pbool max_at_top, queue_compare compare,
my_bool max_at_top, queue_compare compare,
void *first_cmp_arg, uint offset_to_queue_pos,
uint auto_extent);
int reinit_queue(QUEUE *queue,uint max_elements,uint offset_to_key,
pbool max_at_top, queue_compare compare,
my_bool max_at_top, queue_compare compare,
void *first_cmp_arg, uint offset_to_queue_pos,
uint auto_extent);
int resize_queue(QUEUE *queue, uint max_elements);
void delete_queue(QUEUE *queue);
void queue_insert(QUEUE *queue,uchar *element);
void queue_insert(QUEUE *queue, uchar *element);
int queue_insert_safe(QUEUE *queue, uchar *element);
uchar *queue_remove(QUEUE *queue,uint idx);
void queue_replace(QUEUE *queue,uint idx);

#define queue_remove_all(queue) { (queue)->elements= 0; }
#define queue_is_full(queue) (queue->elements == queue->max_elements)
void _downheap(QUEUE *queue, uint idx, uchar *element);
void _downheap(QUEUE *queue, uint idx);
void queue_fix(QUEUE *queue);
#define is_queue_inited(queue) ((queue)->root != 0)

Expand Down
169 changes: 90 additions & 79 deletions mysys/queues.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,9 @@
*/

int init_queue(QUEUE *queue, uint max_elements, uint offset_to_key,
pbool max_at_top, int (*compare) (void *, uchar *, uchar *),
my_bool max_at_top, int (*compare) (void *, uchar *, uchar *),
void *first_cmp_arg, uint offset_to_queue_pos,
uint auto_extent)

{
DBUG_ENTER("init_queue");
if ((queue->root= (uchar **) my_malloc((max_elements + 1) * sizeof(void*),
Expand Down Expand Up @@ -109,7 +108,7 @@ int init_queue(QUEUE *queue, uint max_elements, uint offset_to_key,
*/

int reinit_queue(QUEUE *queue, uint max_elements, uint offset_to_key,
pbool max_at_top, int (*compare) (void *, uchar *, uchar *),
my_bool max_at_top, int (*compare) (void *, uchar *, uchar *),
void *first_cmp_arg, uint offset_to_queue_pos,
uint auto_extent)
{
Expand Down Expand Up @@ -182,6 +181,28 @@ void delete_queue(QUEUE *queue)
}


static void insert_at(QUEUE *queue, uchar *element, uint idx)
{
uint next_index, offset_to_key= queue->offset_to_key;
uint offset_to_queue_pos= queue->offset_to_queue_pos;
/* max_at_top swaps the comparison if we want to order by desc */
while ((next_index= idx >> 1) > 0 &&
queue->compare(queue->first_cmp_arg,
element + offset_to_key,
queue->root[next_index] + offset_to_key) *
queue->max_at_top < 0)
{
queue->root[idx]= queue->root[next_index];
if (offset_to_queue_pos)
(*(uint*) (queue->root[idx] + offset_to_queue_pos-1))= idx;
idx= next_index;
}
queue->root[idx]= element;
if (offset_to_queue_pos)
(*(uint*) (element + offset_to_queue_pos-1))= idx;
}


/*
Insert element in queue
Expand All @@ -191,28 +212,10 @@ void delete_queue(QUEUE *queue)
element Element to insert
*/

void queue_insert(register QUEUE *queue, uchar *element)
void queue_insert(QUEUE *queue, uchar *element)
{
reg2 uint idx, next;
uint offset_to_queue_pos= queue->offset_to_queue_pos;
DBUG_ASSERT(queue->elements < queue->max_elements);

idx= ++queue->elements;
/* max_at_top swaps the comparison if we want to order by desc */
while (idx > 1 &&
(queue->compare(queue->first_cmp_arg,
element + queue->offset_to_key,
queue->root[(next= idx >> 1)] +
queue->offset_to_key) * queue->max_at_top) < 0)
{
queue->root[idx]= queue->root[next];
if (offset_to_queue_pos)
(*(uint*) (queue->root[idx] + offset_to_queue_pos-1))= idx;
idx= next;
}
queue->root[idx]= element;
if (offset_to_queue_pos)
(*(uint*) (element+ offset_to_queue_pos-1))= idx;
insert_at(queue, element, ++queue->elements);
}


Expand All @@ -230,7 +233,7 @@ void queue_insert(register QUEUE *queue, uchar *element)
2 auto_extend is 0; No insertion done
*/

int queue_insert_safe(register QUEUE *queue, uchar *element)
int queue_insert_safe(QUEUE *queue, uchar *element)
{

if (queue->elements == queue->max_elements)
Expand All @@ -240,7 +243,7 @@ int queue_insert_safe(register QUEUE *queue, uchar *element)
if (resize_queue(queue, queue->max_elements + queue->auto_extent))
return 1;
}

queue_insert(queue, element);
return 0;
}
Expand All @@ -259,81 +262,55 @@ int queue_insert_safe(register QUEUE *queue, uchar *element)
pointer to removed element
*/

uchar *queue_remove(register QUEUE *queue, uint idx)
uchar *queue_remove(QUEUE *queue, uint idx)
{
uchar *element;
DBUG_ASSERT(idx >= 1 && idx <= queue->elements);
DBUG_ASSERT(idx >= 1);
DBUG_ASSERT(idx <= queue->elements);
element= queue->root[idx];
_downheap(queue, idx, queue->root[queue->elements--]);
queue->root[idx]= queue->root[queue->elements--];
queue_replace(queue, idx);
return element;
}


/*
Add element to fixed position and update heap
Restores the heap property from idx down the heap
SYNOPSIS
_downheap()
queue Queue to use
idx Index of element to change
element Element to store at 'idx'
NOTE
This only works if element is >= all elements <= start_idx
*/

void _downheap(register QUEUE *queue, uint start_idx, uchar *element)
void _downheap(QUEUE *queue, uint idx)
{
uint elements,half_queue,offset_to_key, next_index, offset_to_queue_pos;
register uint idx= start_idx;
my_bool first= TRUE;

offset_to_key=queue->offset_to_key;
offset_to_queue_pos= queue->offset_to_queue_pos;
half_queue= (elements= queue->elements) >> 1;
uchar *element= queue->root[idx];
uint next_index,
elements= queue->elements,
half_queue= elements >> 1,
offset_to_key= queue->offset_to_key,
offset_to_queue_pos= queue->offset_to_queue_pos;

while (idx <= half_queue)
{
next_index=idx+idx;
next_index= idx+idx;
if (next_index < elements &&
(queue->compare(queue->first_cmp_arg,
queue->root[next_index]+offset_to_key,
queue->root[next_index+1]+offset_to_key) *
queue->max_at_top) > 0)
(queue->compare(queue->first_cmp_arg,
queue->root[next_index]+offset_to_key,
queue->root[next_index+1]+offset_to_key) *
queue->max_at_top) > 0)
next_index++;
if (first &&
(((queue->compare(queue->first_cmp_arg,
queue->root[next_index]+offset_to_key,
element+offset_to_key) * queue->max_at_top) >= 0)))
{
queue->root[idx]= element;
if (offset_to_queue_pos)
(*(uint*) (element + offset_to_queue_pos-1))= idx;
return;
}
first= FALSE;
queue->root[idx]= queue->root[next_index];
if (offset_to_queue_pos)
(*(uint*) (queue->root[idx] + offset_to_queue_pos-1))= idx;
idx=next_index;
}

/*
Insert the element into the right position. This is the same code
as we have in queue_insert()
*/
while ((next_index= (idx >> 1)) > start_idx &&
queue->compare(queue->first_cmp_arg,
element+offset_to_key,
queue->root[next_index]+offset_to_key)*
queue->max_at_top < 0)
{
if ((queue->compare(queue->first_cmp_arg,
queue->root[next_index]+offset_to_key,
element+offset_to_key) * queue->max_at_top) >= 0)
break;
queue->root[idx]= queue->root[next_index];
if (offset_to_queue_pos)
(*(uint*) (queue->root[idx] + offset_to_queue_pos-1))= idx;
idx= next_index;
}
queue->root[idx]= element;
queue->root[idx]=element;
if (offset_to_queue_pos)
(*(uint*) (element + offset_to_queue_pos-1))= idx;
}
Expand All @@ -351,7 +328,7 @@ void queue_fix(QUEUE *queue)
{
uint i;
for (i= queue->elements >> 1; i > 0; i--)
_downheap(queue, i, queue_element(queue, i));
_downheap(queue, i);
}


Expand All @@ -362,13 +339,47 @@ void queue_fix(QUEUE *queue)
queue_replace()
queue Queue to use
idx Index of element to change
element Element to store at 'idx'
NOTE
optimized for the case when the new position is close to the end of the
heap (typical for queue_remove() replacements).
*/

void queue_replace(QUEUE *queue, uint idx)
{
uchar *element= queue->root[idx];
DBUG_ASSERT(idx >= 1 && idx <= queue->elements);
queue_remove(queue, idx);
queue_insert(queue, element);
uint next_index,
elements= queue->elements,
half_queue= elements>>1,
offset_to_key= queue->offset_to_key,
offset_to_queue_pos= queue->offset_to_queue_pos;
my_bool first= TRUE;

while (idx <= half_queue)
{
next_index= idx + idx;
if (next_index < elements &&
queue->compare(queue->first_cmp_arg,
queue->root[next_index]+offset_to_key,
queue->root[next_index+1]+offset_to_key) *
queue->max_at_top > 0)
next_index++;
if (first &&
queue->compare(queue->first_cmp_arg,
queue->root[next_index]+offset_to_key,
element+offset_to_key) * queue->max_at_top >= 0)
{
queue->root[idx]= element;
if (offset_to_queue_pos)
(*(uint*) (element + offset_to_queue_pos-1))= idx;
break;
}
first= FALSE;
queue->root[idx]= queue->root[next_index];
if (offset_to_queue_pos)
(*(uint*) (queue->root[idx] + offset_to_queue_pos-1))= idx;
idx=next_index;
}

insert_at(queue, element, idx);
}
2 changes: 1 addition & 1 deletion unittest/mysys/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA

MY_ADD_TESTS(bitmap base64 my_atomic my_rdtsc lf my_malloc my_getopt dynstring
LINK_LIBRARIES mysys)
queues LINK_LIBRARIES mysys)
MY_ADD_TESTS(my_vsnprintf LINK_LIBRARIES strings mysys)

IF(WIN32)
Expand Down
Loading

0 comments on commit 6a31aea

Please sign in to comment.