Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Fetching contributors…

Cannot retrieve contributors at this time

file 295 lines (248 sloc) 9.604 kb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295
/* -----------------------------------------------------------------------------
*
* (c) The GHC Team, 2009
*
* Work-stealing Deque data structure
*
* The implementation uses Double-Ended Queues with lock-free access
* (thereby often called "deque") as described in
*
* D.Chase and Y.Lev, Dynamic Circular Work-Stealing Deque.
* SPAA'05, July 2005, Las Vegas, USA.
* ACM 1-58113-986-1/05/0007
*
* Author: Jost Berthold MSRC 07-09/2008
*
* The DeQue is held as a circular array with known length. Positions
* of top (read-end) and bottom (write-end) always increase, and the
* array is accessed with indices modulo array-size. While this bears
* the risk of overflow, we assume that (with 64 bit indices), a
* program must run very long to reach that point.
*
* The write end of the queue (position bottom) can only be used with
* mutual exclusion, i.e. by exactly one caller at a time. At this
* end, new items can be enqueued using pushBottom()/newSpark(), and
* removed using popBottom()/reclaimSpark() (the latter implying a cas
* synchronisation with potential concurrent readers for the case of
* just one element).
*
* Multiple readers can steal from the read end (position top), and
* are synchronised without a lock, based on a cas of the top
* position. One reader wins, the others return NULL for a failure.
*
* Both popWSDeque and stealWSDeque also return NULL when the queue is empty.
*
* Testing: see testsuite/tests/rts/testwsdeque.c. If
* there's anything wrong with the deque implementation, this test
* will probably catch it.
*
* ---------------------------------------------------------------------------*/

#include "PosixSource.h"
#include "Rts.h"

#include "RtsUtils.h"
#include "WSDeque.h"

#define CASTOP(addr,old,new) ((old) == cas(((StgPtr)addr),(old),(new)))

/* -----------------------------------------------------------------------------
* newWSDeque
* -------------------------------------------------------------------------- */

/* internal helpers ... */

static StgWord
roundUp2(StgWord val)
{
    StgWord rounded = 1;
    
    /* StgWord is unsigned anyway, only catch 0 */
    if (val == 0) {
        barf("DeQue,roundUp2: invalid size 0 requested");
    }
    /* at least 1 bit set, shift up to its place */
    do {
        rounded = rounded << 1;
    } while (0 != (val = val>>1));
    return rounded;
}

WSDeque *
newWSDeque (nat size)
{
    StgWord realsize;
    WSDeque *q;
    
    realsize = roundUp2(size); /* to compute modulo as a bitwise & */
    
    q = (WSDeque*) stgMallocBytes(sizeof(WSDeque), /* admin fields */
                                  "newWSDeque");
    q->elements = stgMallocBytes(realsize * sizeof(StgClosurePtr), /* dataspace */
                                 "newWSDeque:data space");
    q->top=0;
    q->bottom=0;
    q->topBound=0; /* read by writer, updated each time top is read */
    
    q->size = realsize; /* power of 2 */
    q->moduloSize = realsize - 1; /* n % size == n & moduloSize */
    
    ASSERT_WSDEQUE_INVARIANTS(q);
    return q;
}

/* -----------------------------------------------------------------------------
* freeWSDeque
* -------------------------------------------------------------------------- */

void
freeWSDeque (WSDeque *q)
{
    stgFree(q->elements);
    stgFree(q);
}

/* -----------------------------------------------------------------------------
*
* popWSDeque: remove an element from the write end of the queue.
* Returns the removed spark, and NULL if a race is lost or the pool
* empty.
*
* If only one spark is left in the pool, we synchronise with
* concurrently stealing threads by using cas to modify the top field.
* This routine should NEVER be called by a task which does not own
* this deque.
*
* -------------------------------------------------------------------------- */

void *
popWSDeque (WSDeque *q)
{
    /* also a bit tricky, has to avoid concurrent steal() calls by
accessing top with cas, when there is only one element left */
    StgWord t, b;
    long currSize;
    void * removed;
    
    ASSERT_WSDEQUE_INVARIANTS(q);
    
    b = q->bottom;

    // "decrement b as a test, see what happens"
    b--;
    q->bottom = b;

    // very important that the following read of q->top does not occur
    // before the earlier write to q->bottom.
    store_load_barrier();

    t = q->top; /* using topBound would give an *upper* bound, we
need a lower bound. We use the real top here, but
can update the topBound value */
    q->topBound = t;
    currSize = (long)b - (long)t;
    if (currSize < 0) { /* was empty before decrementing b, set b
consistently and abort */
        q->bottom = t;
        return NULL;
    }

    // read the element at b
    removed = q->elements[b & q->moduloSize];

    if (currSize > 0) { /* no danger, still elements in buffer after b-- */
        // debugBelch("popWSDeque: t=%ld b=%ld = %ld\n", t, b, removed);
        return removed;
    }
    /* otherwise, has someone meanwhile stolen the same (last) element?
Check and increment top value to know */
    if ( !(CASTOP(&(q->top),t,t+1)) ) {
        removed = NULL; /* no success, but continue adjusting bottom */
    }
    q->bottom = t+1; /* anyway, empty now. Adjust bottom consistently. */
    q->topBound = t+1; /* ...and cached top value as well */
    
    ASSERT_WSDEQUE_INVARIANTS(q);
    ASSERT(q->bottom >= q->top);
    
    // debugBelch("popWSDeque: t=%ld b=%ld = %ld\n", t, b, removed);

    return removed;
}

/* -----------------------------------------------------------------------------
* stealWSDeque
* -------------------------------------------------------------------------- */

void *
stealWSDeque_ (WSDeque *q)
{
    void * stolen;
    StgWord b,t;
    
// Can't do this on someone else's spark pool:
// ASSERT_WSDEQUE_INVARIANTS(q);
    
    // NB. these loads must be ordered, otherwise there is a race
    // between steal and pop.
    t = q->top;
    load_load_barrier();
    b = q->bottom;
    
    // NB. b and t are unsigned; we need a signed value for the test
    // below, because it is possible that t > b during a
    // concurrent popWSQueue() operation.
    if ((long)b - (long)t <= 0 ) {
        return NULL; /* already looks empty, abort */
  }
    
    /* now access array, see pushBottom() */
    stolen = q->elements[t & q->moduloSize];
    
    /* now decide whether we have won */
    if ( !(CASTOP(&(q->top),t,t+1)) ) {
        /* lost the race, someon else has changed top in the meantime */
        return NULL;
    } /* else: OK, top has been incremented by the cas call */

    // debugBelch("stealWSDeque_: t=%d b=%d\n", t, b);

// Can't do this on someone else's spark pool:
// ASSERT_WSDEQUE_INVARIANTS(q);
    
    return stolen;
}

void *
stealWSDeque (WSDeque *q)
{
    void *stolen;
    
    do {
        stolen = stealWSDeque_(q);
    } while (stolen == NULL && !looksEmptyWSDeque(q));
    
    return stolen;
}

/* -----------------------------------------------------------------------------
* pushWSQueue
* -------------------------------------------------------------------------- */

#define DISCARD_NEW

/* enqueue an element. Should always succeed by resizing the array
(not implemented yet, silently fails in that case). */
rtsBool
pushWSDeque (WSDeque* q, void * elem)
{
    StgWord t;
    StgWord b;
    StgWord sz = q->moduloSize;
    
    ASSERT_WSDEQUE_INVARIANTS(q);
    
    /* we try to avoid reading q->top (accessed by all) and use
q->topBound (accessed only by writer) instead.
This is why we do not just call empty(q) here.
*/
    b = q->bottom;
    t = q->topBound;
    if ( (StgInt)b - (StgInt)t >= (StgInt)sz ) {
        /* NB. 1. sz == q->size - 1, thus ">="
2. signed comparison, it is possible that t > b
*/
        /* could be full, check the real top value in this case */
        t = q->top;
        q->topBound = t;
        if (b - t >= sz) { /* really no space left :-( */
            /* reallocate the array, copying the values. Concurrent steal()s
will in the meantime use the old one and modify only top.
This means: we cannot safely free the old space! Can keep it
on a free list internally here...
Potential bug in combination with steal(): if array is
replaced, it is unclear which one concurrent steal operations
use. Must read the array base address in advance in steal().
*/
#if defined(DISCARD_NEW)
            ASSERT_WSDEQUE_INVARIANTS(q);
            return rtsFalse; // we didn't push anything
#else
            /* could make room by incrementing the top position here. In
* this case, should use CASTOP. If this fails, someone else has
* removed something, and new room will be available.
*/
            ASSERT_WSDEQUE_INVARIANTS(q);
#endif
        }
    }

    q->elements[b & sz] = elem;
    /*
KG: we need to put write barrier here since otherwise we might
end with elem not added to q->elements, but q->bottom already
modified (write reordering) and with stealWSDeque_ failing
later when invoked from another thread since it thinks elem is
there (in case there is just added element in the queue). This
issue concretely hit me on ARMv7 multi-core CPUs
*/
    write_barrier();
    q->bottom = b + 1;
    
    ASSERT_WSDEQUE_INVARIANTS(q);
    return rtsTrue;
}
Something went wrong with that request. Please try again.