public
Description: Sharded, memcached counter for Google App Engine
Homepage:
Clone URL: git://github.com/DocSavage/sharded_counter.git
sharded_counter / counter.py
100644 147 lines (122 sloc) 5.11 kb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# Copyright 2008 William T Katz
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# THIS LICENSE INFORMATION/ATTRIBUTION must be left in place.
 
import string
import random
import logging
 
from google.appengine.api import memcache
from google.appengine.ext import db
from google.appengine.runtime import apiproxy_errors
 
 
class MemcachedCount(object):
    DELTA_ZERO = 500000 # Allows negative numbers in unsigned memcache
 
    def __init__(self, name):
        self.key = 'MemcachedCount' + name
 
    def get_count(self):
        value = memcache.get(self.key)
        if value is None:
            return 0
        else:
            return string.atoi(value) - MemcachedCount.DELTA_ZERO
 
    def set_count(self, value):
        memcache.set(self.key, str(MemcachedCount.DELTA_ZERO + value))
 
    def delete_count(self):
        memcache.delete(self.key)
 
    count = property(get_count, set_count, delete_count)
 
    def increment(self, incr=1):
        value = memcache.get(self.key)
        if value is None:
            self.count = incr
        elif incr > 0:
            memcache.incr(self.key, incr)
        elif incr < 0:
            memcache.decr(self.key, -incr)
 
class Counter(object):
    """A counter using sharded writes to prevent contentions.
 
Should be used for counters that handle a lot of concurrent use.
Follows pattern described in Google I/O talk:
http://sites.google.com/site/io/building-scalable-web-applications-with-google-app-engine
 
Memcache is used for caching counts and if a cached count is available, it is
the most correct. If there are datastore put issues, we store the un-put values
into a delayed_incr memcache that will be applied as soon as the next shard put
is successful. Changes will only be lost if we lose memcache before a successful
datastore shard put or there's a failure/error in memcache.
 
Usage:
hits = Counter('hits')
hits.increment()
my_hits = hits.count
hits.get_count(nocache=True) # Forces non-cached count of all shards
hits.count = 6 # Set the counter to arbitrary value
hits.increment(incr=-1) # Decrement
hits.increment(10)
"""
    NUM_SHARDS = 20
 
    def __init__(self, name):
        self.name = name
        self.memcached = MemcachedCount('Counter' + name)
        self.delayed_incr = MemcachedCount('DelayedIncr' + name)
 
    def delete(self):
        q = db.Query(CounterShard).filter('name =', self.name)
        shards = q.fetch(limit=Counter.NUM_SHARDS)
        db.delete(shards)
 
    def get_count_and_cache(self):
        q = db.Query(CounterShard).filter('name =', self.name)
        shards = q.fetch(limit=Counter.NUM_SHARDS)
        datastore_count = 0
        for shard in shards:
            datastore_count += shard.count
        count = datastore_count + self.delayed_incr.count
        self.memcached.count = count
        return count
 
    def get_count(self, nocache=False):
        total = self.memcached.count
        if nocache or total is None:
            return self.get_count_and_cache()
        else:
            return int(total)
 
    def set_count(self, value):
        cur_value = self.get_count()
        self.memcached.count = value
        delta = value - cur_value
        if delta != 0:
            CounterShard.increment(self, incr=delta)
 
    count = property(get_count, set_count)
 
    def increment(self, incr=1, refresh=False):
        CounterShard.increment(self, incr)
        self.memcached.increment(incr)
 
 
class CounterShard(db.Model):
    name = db.StringProperty(required=True)
    count = db.IntegerProperty(default=0)
 
    @classmethod
    def increment(cls, counter, incr=1):
        index = random.randint(1, Counter.NUM_SHARDS)
        counter_name = counter.name
        delayed_incr = counter.delayed_incr.count
        shard_key_name = 'Shard' + counter_name + str(index)
        def get_or_create_shard():
            shard = CounterShard.get_by_key_name(shard_key_name)
            if shard is None:
                shard = CounterShard(key_name=shard_key_name, name=counter_name)
            shard.count += incr + delayed_incr
            key = shard.put()
        try:
            db.run_in_transaction(get_or_create_shard)
        except (db.Error, apiproxy_errors.Error), e:
            counter.delayed_incr.increment(incr)
            logging.error("CounterShard (%s) delayed increment %d: %s",
                          counter_name, incr, e)
            return False
        if delayed_incr:
            counter.delayed_incr.count = 0
        return True