Skip to content
This repository
tree: d0dd99c728
Fetching contributors…

Cannot retrieve contributors at this time

file 176 lines (151 sloc) 5.464 kb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
/*
* Copyright 2012 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* Higher performance (up to 10x) atomic increment using thread caching.
*
* @author Spencer Ahrens (sahrens)
*/

#ifndef FOLLY_THREADCACHEDINT_H
#define FOLLY_THREADCACHEDINT_H

#include <atomic>
#include "folly/Likely.h"
#include "folly/ThreadLocal.h"

namespace folly {


// Note that readFull requires holding a lock and iterating through all of the
// thread local objects with the same Tag, so if you have a lot of
// ThreadCachedInt's you should considering breaking up the Tag space even
// further.
template <class IntT, class Tag=IntT>
class ThreadCachedInt : boost::noncopyable {
  struct IntCache;

 public:
  explicit ThreadCachedInt(IntT initialVal = 0, uint32_t cacheSize = 1000)
    : target_(initialVal), cacheSize_(cacheSize) {
  }

  void increment(IntT inc) {
    auto cache = cache_.get();
    if (UNLIKELY(cache == NULL || cache->parent_ == NULL)) {
      cache = new IntCache(*this);
      cache_.reset(cache);
    }
    cache->increment(inc);
  }

  // Quickly grabs the current value which may not include some cached
  // increments.
  IntT readFast() const {
    return target_.load(std::memory_order_relaxed);
  }

  // Reads the current value plus all the cached increments. Requires grabbing
  // a lock, so this is significantly slower than readFast().
  IntT readFull() const {
    IntT ret = readFast();
    for (const auto& cache : cache_.accessAllThreads()) {
      if (!cache.reset_.load(std::memory_order_acquire)) {
        ret += cache.val_.load(std::memory_order_relaxed);
      }
    }
    return ret;
  }

  // Quickly reads and resets current value (doesn't reset cached increments).
  IntT readFastAndReset() {
    return target_.exchange(0, std::memory_order_release);
  }

  // This function is designed for accumulating into another counter, where you
  // only want to count each increment once. It can still get the count a
  // little off, however, but it should be much better than calling readFull()
  // and set(0) sequentially.
  IntT readFullAndReset() {
    IntT ret = readFastAndReset();
    for (auto& cache : cache_.accessAllThreads()) {
      if (!cache.reset_.load(std::memory_order_acquire)) {
        ret += cache.val_.load(std::memory_order_relaxed);
        cache.reset_.store(true, std::memory_order_release);
      }
    }
    return ret;
  }

  void setCacheSize(uint32_t newSize) {
    cacheSize_.store(newSize, std::memory_order_release);
  }

  uint32_t getCacheSize() const {
    return cacheSize_.load();
  }

  ThreadCachedInt& operator+=(IntT inc) { increment(inc); return *this; }
  ThreadCachedInt& operator-=(IntT inc) { increment(-inc); return *this; }
  // pre-increment (we don't support post-increment)
  ThreadCachedInt& operator++() { increment(1); return *this; }
  ThreadCachedInt& operator--() { increment(-1); return *this; }

  // Thread-safe set function.
  // This is a best effort implementation. In some edge cases, there could be
  // data loss (missing counts)
  void set(IntT newVal) {
    for (auto& cache : cache_.accessAllThreads()) {
      cache.reset_.store(true, std::memory_order_release);
    }
    target_.store(newVal, std::memory_order_release);
  }

  // This is a little tricky - it's possible that our IntCaches are still alive
  // in another thread and will get destroyed after this destructor runs, so we
  // need to make sure we signal that this parent is dead.
  ~ThreadCachedInt() {
    for (auto& cache : cache_.accessAllThreads()) {
      cache.parent_ = NULL;
    }
  }

 private:
  std::atomic<IntT> target_;
  std::atomic<uint32_t> cacheSize_;
  ThreadLocalPtr<IntCache,Tag> cache_; // Must be last for dtor ordering

  // This should only ever be modified by one thread
  struct IntCache {
    ThreadCachedInt* parent_;
    mutable std::atomic<IntT> val_;
    mutable uint32_t numUpdates_;
    std::atomic<bool> reset_;

    explicit IntCache(ThreadCachedInt& parent)
        : parent_(&parent), val_(0), numUpdates_(0), reset_(false) {}

    void increment(IntT inc) {
      if (LIKELY(!reset_.load(std::memory_order_acquire))) {
        // This thread is the only writer to val_, so it's fine do do
        // a relaxed load and do the addition non-atomically.
        val_.store(
          val_.load(std::memory_order_relaxed) + inc,
          std::memory_order_release
        );
      } else {
        val_.store(inc, std::memory_order_relaxed);
        reset_.store(false, std::memory_order_release);
      }
      ++numUpdates_;
      if (UNLIKELY(numUpdates_ >
                   parent_->cacheSize_.load(std::memory_order_acquire))) {
        flush();
      }
    }

    void flush() const {
      parent_->target_.fetch_add(val_, std::memory_order_release);
      val_.store(0, std::memory_order_release);
      numUpdates_ = 0;
    }

    ~IntCache() {
      if (parent_) {
        flush();
      }
    }
  };
};

}

#endif
Something went wrong with that request. Please try again.