Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Cleanup

  • Loading branch information...
commit 4b28c30b84614424f7e40d2f1d74e4a716c25547 1 parent 571d7aa
@simontaotw simontaotw authored
View
203 livecount/counter.py
@@ -15,13 +15,17 @@
#
-from google.appengine.ext import db
-from google.appengine.api.labs import taskqueue
-from google.appengine.api import memcache
-from google.appengine.ext import webapp
+from datetime import datetime, timedelta
import logging
+import time
import wsgiref.handlers
+from google.appengine.api import memcache
+from google.appengine.api.labs import taskqueue
+from google.appengine.ext import db
+from google.appengine.ext import webapp
+
+
"""
Livecount is a memcache-based counter api with asynchronous writes to persist counts to datastore.
@@ -30,110 +34,158 @@
- Read-Through
"""
+class PeriodType(object):
+ SECOND = "second"
+ MINUTE = "minute"
+ HOUR = "hour"
+ DAY = "day"
+ WEEK = "week"
+ MONTH = "month"
+ YEAR = "year"
+ ALL = "all"
+
+ @staticmethod
+ def find_scope(period_type, period):
+ if period_type == PeriodType.SECOND:
+ return str(period)[0:19] # 2011-06-13 18:11:32
+ elif period_type == PeriodType.MINUTE:
+ return str(period)[0:16] # 2011-06-13 18:11
+ elif period_type == PeriodType.HOUR:
+ return str(period)[0:13] # 2011-06-13 18
+ elif period_type == PeriodType.DAY:
+ return str(period)[0:10] # 2011-06-13
+ elif period_type == PeriodType.WEEK:
+ if not isinstance(period, datetime):
+ period = PeriodType.str_to_datetime(period)
+ return str(period- timedelta(period.weekday()))[0:10]+"week" # 2011-06-13week; use Monday as marker
+ elif period_type == PeriodType.MONTH:
+ return str(period)[0:7] # 2011-06
+ elif period_type == PeriodType.YEAR:
+ return str(period)[0:4] # 2011
+ else:
+ return "all"
+
+ @staticmethod
+ def str_to_datetime(datetime_str):
+ time_format = "%Y-%m-%d %H:%M:%S"
+ return datetime.fromtimestamp(time.mktime(time.strptime(datetime_str.split('.')[0], time_format)))
+
class LivecountCounter(db.Model):
+ name = db.StringProperty()
count = db.IntegerProperty()
+ period = db.StringProperty()
+ period_type = db.StringProperty(default=PeriodType.ALL)
namespace = db.StringProperty(default="default")
@staticmethod
- def KeyName(name, namespace):
- return namespace + ':' + name
+ def KeyName(namespace, period_type, period, name):
+ scoped_period = PeriodType.find_scope(period_type, period)
+ return namespace + ":" + period_type + ":" + scoped_period + ":" + name
+
+ @staticmethod
+ def PartialKeyName(period_type, period, name):
+ scoped_period = PeriodType.find_scope(period_type, period)
+ return period_type + ":" + scoped_period + ":" + name
-def load_and_get_count(name, namespace='default'):
- #logging.info("Getting counter, name = " + name + ", namespace = " + namespace)
- count = memcache.get(name, namespace=namespace)
+def load_and_get_count(name, period, period_type='day', namespace='default'):
+ partial_key = LivecountCounter.PartialKeyName(period_type, period, name)
+ # Try memcache first
+ count = memcache.get(partial_key, namespace=namespace)
if count is None:
# See if this counter already exists in the datastore
- key = LivecountCounter.KeyName(name, namespace)
- record = LivecountCounter.get_by_key_name(key)
+ full_key = LivecountCounter.KeyName(namespace, period_type, period, name)
+ record = LivecountCounter.get_by_key_name(full_key)
count = None
# If counter exists in the datastore, but is not currently in memcache, add it
if record:
count = record.count
- memcache.add(name, count, namespace=namespace)
+ memcache.add(partial_key, count, namespace=namespace)
return count
-def load_and_increment_counter(name, delta, namespace='default', batch_size=None):
+
+def load_and_increment_counter(name, period, period_types, delta, namespace='default', batch_size=None):
"""
Setting batch size allows control of how often a writeback worker is created.
By default, this happens at every increment to ensure maximum durability.
If there is already a worker waiting to write the value of a counter, another will not be created.
+ """
+ # Warning: There is a race condition here. If two processes try to load
+ # the same value from the datastore, one's update may be lost.
+ # TODO: Think more about whether we care about this...
- Warning: There is a potential race condition here. If two processes try to load
- the same value from the datastore, one's update may be lost.
-
- Thoughts: This could be avoided by wrapping the critical section in a transaction, at the cost of some performance.
- In practice, this is rarely a problem, since counters that are frequently updated usually stay resident
- in memcache and counters that are infrequently updated are unlikely to have two updates come in at the
- same time. For some applications, the potential for lost updates is unacceptable. In these cases, it would
- make sense to use AppEngine's transaction mechanism here.
-
- TODO: Think more about whether we care about this.
- """
- #logging.info("Incrementing counter, name = " + name)
- current_count = None
-
- incr_reslt = None
- if delta >= 0:
- incr_reslt = memcache.incr(name, delta, namespace=namespace)
- else: # Since increment by negative number is not supported, convert to decrement
- incr_reslt = memcache.decr(name, -delta, namespace=namespace)
-
- if incr_reslt is None:
- # See if this counter already exists in the datastore
- key = LivecountCounter.KeyName(name, namespace)
- record = LivecountCounter.get_by_key_name(key)
- if record:
- # Load last value from datastore
- new_counter_value = record.count + delta
- if new_counter_value < 0: new_counter_value = 0 # To match behavior of memcache.decr(), don't allow negative values
- memcache.add(name, new_counter_value, namespace=namespace)
- if batch_size: current_count = record.count
+ for period_type in period_types:
+ current_count = None
+
+ incr_result = None
+ partial_key = LivecountCounter.PartialKeyName(period_type, period, name)
+ if delta >= 0:
+ incr_result = memcache.incr(partial_key, delta, namespace=namespace)
+ else: # Since increment by negative number is not supported, convert to decrement
+ incr_result = memcache.decr(partial_key, -delta, namespace=namespace)
+
+ if incr_result is None:
+ # See if this counter already exists in the datastore
+ full_key = LivecountCounter.KeyName(namespace, period_type, period, name)
+ record = LivecountCounter.get_by_key_name(full_key)
+ if record:
+ # Load last value from datastore
+ new_counter_value = record.count + delta
+ if new_counter_value < 0: new_counter_value = 0 # To match behavior of memcache.decr(), don't allow negative values
+ memcache.add(partial_key, new_counter_value, namespace=namespace)
+ if batch_size: current_count = record.count
+ else:
+ # Start new counter
+ memcache.add(partial_key, delta, namespace=namespace)
+ if batch_size: current_count = delta
else:
- # Start new counter
- memcache.add(name, delta, namespace=namespace)
- if batch_size: current_count = delta
- else:
- if batch_size: current_count = memcache.get(name, namespace=namespace)
- # If batch_size is set, only try creating one worker per batch
- if not batch_size or (batch_size and current_count % batch_size == 0):
- if memcache.add(name + '_dirty', delta, namespace=namespace):
- #logging.info("Adding task to taskqueue. counter value = " + str(memcache.get(name, namespace=namespace)))
- taskqueue.add(queue_name='livecount-writebacks', url='/livecount/worker', params={'name': name, 'namespace': namespace})
-
-
-def load_and_decrement_counter(name, delta, namespace='default', batch_size=None):
- #logging.info("Decrementing counter, name = " + name)
- load_and_increment_counter(name, -delta, namespace, batch_size)
+ if batch_size: current_count = memcache.get(partial_key, namespace=namespace)
+ # If batch_size is set, only try creating one worker per batch
+ if not batch_size or (batch_size and current_count % batch_size == 0):
+ if memcache.add(partial_key + '_dirty', delta, namespace=namespace):
+ #logging.info("Adding task to taskqueue. counter value = " + str(memcache.get(partial_key, namespace=namespace)))
+ taskqueue.add(queue_name='livecount-writebacks', url='/livecount/worker', params={'name': name, 'period': period, 'period_type': period_type, 'namespace': namespace}) # post parameter
+
+
+def load_and_decrement_counter(name, period, period_types, delta, namespace='default', batch_size=None):
+ load_and_increment_counter(name, period, period_types, -delta, namespace, batch_size)
class LivecountCounterWorker(webapp.RequestHandler):
def post(self):
- #logging.info("Running LivecountCounterWorker...")
name = self.request.get('name')
+ period = self.request.get('period')
+ period_type = self.request.get('period_type')
namespace = self.request.get('namespace')
- key = LivecountCounter.KeyName(name, namespace)
- #logging.info("Worker for name = " + name + ", namespace = " + namespace + ", key = " + key)
- memcache.delete(name + '_dirty', namespace=namespace)
- value = memcache.get(name, namespace=namespace)
+
+ partial_key = LivecountCounter.PartialKeyName(period_type, period, name)
+ full_key = LivecountCounter.KeyName(namespace, period_type, period, name)
+
+ memcache.delete(partial_key + '_dirty', namespace=namespace)
+ value = memcache.get(partial_key, namespace=namespace)
if value is None:
- logging.error('LivecountCounterWorker: Failure for key=%s', key)
+ logging.error('LivecountCounterWorker: Failure for partial key=%s', partial_key)
return
- LivecountCounter(key_name=key, count=value, namespace=namespace).put()
+
+ # add new row in datastore
+ scoped_period = PeriodType.find_scope(period_type, period)
+ LivecountCounter(key_name=full_key, count=value, name=name, period=scoped_period, period_type=period_type, namespace=namespace).put()
class ClearEntireCacheHandler(webapp.RequestHandler):
- """ Clears entire memcache
+ """
+ Clears entire memcache
"""
def get(self):
logging.info("Deleting all counters in memcache. Any counts not previously flushed will be lost.")
- result=in_memory_counter.ClearEntireCache()
+ result = in_memory_counter.ClearEntireCache()
self.response.out.write("Done. ClearEntireCache succeeded = " + str(result))
class WritebackAllCountersHandler(webapp.RequestHandler):
- """ Writes back all counters from memory to the datastore
+ """
+ Writes back all counters from memory to the datastore
"""
def get(self):
@@ -146,19 +198,24 @@ def get(self):
self.response.out.write("Done. WritebackAllCounters succeeded = " + str(result))
-
+"""
class GetCountHandler(webapp.RequestHandler):
- """ Get counter value from memcache or datastore
- """
+
+ Get counter value from memcache or datastore
+
def get(self):
name = self.request.get('name')
+ period = self.request.get('period')
+ period_type = self.request.get('period_type')
namespace = self.request.get('namespace')
- count = counter.load_and_get_count(name, namespace)
+
+ count = counter.load_and_get_count(namespace, period_type, period, name)
if count:
self.response.set_status(200)
self.response.out.write(count)
else:
self.response.set_status(404)
+ """
class RedirectToCounterAdminHandler(webapp.RequestHandler):
@@ -179,7 +236,7 @@ def main():
('/livecount/worker', LivecountCounterWorker),
('/livecount/clear_entire_cache', ClearEntireCacheHandler),
('/livecount/writeback_all_counters', WritebackAllCountersHandler),
- ('/livecount/get_count', GetCountHandler),
+ #('/livecount/get_count', GetCountHandler),
('/', RedirectToCounterAdminHandler)
], debug=True)
wsgiref.handlers.CGIHandler().run(application)
View
127 livecount/counter_admin.html
@@ -1,46 +1,95 @@
<!DOCTYPE HTML>
<html>
- <head>
- <title> GAE-Livecount - Counter Admin </title>
- </head>
- <body>
+ <head>
+ <title> GAE-Livecount - Counter Admin </title>
+ </head>
+ <body>
- <form action="/livecount/counter_admin" method="get">
- <input type="hidden" name="counter" value="{{ counter_name }}" />
- <h2>Namespace: <input type="input" name="namespace" value="{{ namespace }}" /></h2>
- <p><input type="submit" value="Change Namespace" /></p>
- </form>
+ <h3>Modify Counter:</h3>
+ <form action="/livecount/counter_admin" method="post">
+ <div>
+ <div style="width: 250px; height: 30px;">
+ <label style="float: left">Namespace:</label>
+ <input style="float: right" type="input" name="namespace" value="{{ namespace }}" />
+ </div>
+ <div style="width: 250px; height: 30px;">
+ <label style="float: left">Period Type(s):</label>
+ <input style="float: right" type="input" name="period_types" value="{{ period_types }}" />
+ </div>
+ <div style="width: 250px; height: 30px;">
+ <label style="float: left">Period:</label>
+ <input style="float: right" type="input" name="period" value="{{ period }}" />
+ </div>
+ <div style="width: 250px; height: 30px;">
+ <label style="float: left">Counter Name:</label>
+ <input style="float: right" type="input" name="counter_name" value="{{ counter_name }}" />
+ </div>
+ <div style="width: 250px; height: 30px;">
+ <label style="float: left">Delta:</label>
+ <input style="float: right" type="input" name="delta" value="{{ delta }}" />
+ </div>
+
+ <div style="width: 250px">
+ <input style="float: left" type="submit" name="type" value="Increment Counter" />
+ <input style="float: left" type="submit" name="type" value="Decrement Counter" />
+ </div>
+ <div style="clear: both"></div>
+ </div>
+ </form>
+
+ {% if modified_counter %}
+ <h3>Updated counter:</h3>
+ <p>
+ {{ modified_counter.count }} = {{ modified_counter.key.name }}
+ <br>
+ </p>
+ {% endif %}
+ <br>
+ <br>
+
- </form>
+ <h3>Filter Top 20 LivecountCounter Values:</h3>
- <h2>Top 20 LivecountCounter Values:</h2>
- <p>
- {% for counter in counters %}
- {{ counter.key.name }} = {{ counter.count }}<br>
- {% endfor %}
- </p>
-
- <p><input type="button" value="Refresh List" onClick="location.href='/livecount/counter_admin?namespace={{ namespace }}&counter_name={{ counter_name }}&delta={{ delta }}'"/></p>
+ <form action="/livecount/counter_admin" method="get">
+ <div>
+ <div style="width: 250px; height: 30px;">
+ <label style="float: left">Namespace:</label>
+ <input style="float: right" type="input" name="namespace" value="{{ namespace }}" />
+ </div>
+ <div style="width: 250px; height: 30px;">
+ <label style="float: left">Period Type:</label>
+ <input style="float: right" type="input" name="period_type" value="{{ period_type }}" />
+ </div>
+ <div style="width: 250px; height: 30px;">
+ <label style="float: left">Period:</label>
+ <input style="float: right" type="input" name="period" value="{{ period }}" />
+ </div>
+ <div style="width: 250px; height: 30px;">
+ <label style="float: left">Counter Name:</label>
+ <input style="float: right" type="input" name="counter_name" value="{{ counter_name }}" />
+ </div>
+
+ <div style="width: 250px">
+ <input style="float: left" type="submit" value="Filter" />
+ </div>
+ <div style="clear: both"></div>
+ </div>
+ </form>
- {% if modified_counter %}
- <h2>Upadated counter:</h2>
- <p>
- {{ modified_counter.key.name }} = {{ modified_counter.count }}<br>
- </p>
- {% endif %}
-
- <h2>Memcache Stats:</h2>
- <p>{{ stats }}</p>
-
- <h2>Modify Counter:</h2>
- <form action="/livecount/counter_admin" method="post">
- <p>Counter Name: <input type="input" name="counter" value="{{ counter_name }}" /></p>
- <p>Namespace: <input type="input" name="namespace" value="{{ namespace }}" /></p>
- <p>Delta: <input type="input" name="delta" value="{{ delta }}" /></p>
- <p><input type="submit" name="type" value="Increment Counter" /></p>
- <p><input type="submit" name="type" value="Decrement Counter" /></p>
- </form>
+ <p>
+ {% for counter in counters %}
+ {{ counter.count }} = {{ counter.key.name }}
+ <br>
+ {% endfor %}
+ </p>
+ <br>
+ <br>
+
+
+ <h3>Memcache Stats:</h3>
+ <p>
+ {{ stats }}
+ </p>
-
- </body>
-</html>
+ </body>
+</html>
View
172 livecount/counter_admin.py
@@ -15,120 +15,106 @@
#
-
+from datetime import datetime
+import logging
import os
+import simplejson
import wsgiref.handlers
+
from google.appengine.ext import webapp
from google.appengine.ext.webapp import template
+
from livecount import counter
from livecount.counter import LivecountCounter
-import logging
-import simplejson
+from livecount.counter import PeriodType
-#counter_list = []
class CounterHandler(webapp.RequestHandler):
- """Handles displaying the values of the counters
- and requests to increment/decrement counters.
- """
-
- def get(self):
- counter_name = self.request.get('counter_name')
- namespace = self.request.get('namespace')
- if not namespace:
- namespace = "default"
- delta = self.request.get('delta')
- if not delta:
- delta = 0
- logging.info("getting LivecountCounters for namespace = " + str(namespace))
- modified_counter = None
- if counter_name:
- modified_counter = LivecountCounter.get_by_key_name(namespace + ":" + counter_name)
-
- counter_entities_query = LivecountCounter.all().filter("namespace = ", namespace).order('-count')
- counter_entities = counter_entities_query.fetch(20)
- logging.info("counter_entities: " + str(counter_entities))
-
- stats = counter.GetMemcacheStats()
- template_values = {
- 'namespace': namespace,
- 'counters': counter_entities,
- 'modified_counter': modified_counter,
- 'counter_name': counter_name,
- 'delta': delta,
- 'stats': stats
- }
- logging.info("template_values: " + str(template_values))
- template_file = os.path.join(os.path.dirname(__file__), 'counter_admin.html')
- self.response.out.write(template.render(template_file, template_values))
+ """
+ Handles displaying the values of the counters
+ and requests to increment/decrement counters.
+ """
- def post(self):
- global counter_list
- counter_name = self.request.get('counter')
- namespace = self.request.get('namespace')
- delta = self.request.get('delta')
- type = self.request.get('type')
-# if counter_name not in counter_list:
-# counter_list.append(counter_name)
-# logging.info("counter_list: " + str(counter_list))
- if type == "Increment Counter":
- counter.load_and_increment_counter(counter_name, long(delta), namespace=namespace)
- elif type == "Decrement Counter":
- counter.load_and_decrement_counter(counter_name, long(delta), namespace=namespace)
- self.redirect("/livecount/counter_admin?namespace=" + namespace + "&counter_name=" + counter_name + "&delta=" + delta)
-
-
-class GetCounterHandler(webapp.RequestHandler):
- """Handles displaying the values of the counters
- and requests to increment/decrement counters.
- """
-
- def get(self):
- counter_name = self.request.get('counter_name')
- namespace = self.request.get('namespace')
- if not namespace:
- namespace = "share_domain_count"
- fetch_limit = self.request.get('fetch_limit')
- if not fetch_limit:
- fetch_limit = "20"
+ def get(self):
+ namespace = self.request.get('namespace')
+ period_type = self.request.get('period_type')
+ period_types = self.request.get('period_types').replace(" ", "")
+ period = self.request.get('period')
+ name = self.request.get('counter_name')
+ delta = self.request.get('delta')
+ fetch_limit = self.request.get('fetch_limit')
- if counter_name:
- logging.info("querying counter directly for counter_name = " + str(counter_name) + ", namespace = " + str(namespace))
- count = counter.load_and_get_count(counter_name, namespace=namespace)
+ if not namespace:
+ namespace = "default"
+ if not period_type:
+ period_type = PeriodType.DAY
+ if not period_types:
+ period_types = PeriodType.DAY + "," + PeriodType.WEEK
+ if not period:
+ period = str(datetime.now()).split(".")[0]
+ if not delta:
+ delta = 1
+ if not fetch_limit:
+ fetch_limit = "20"
- self.response.set_status(200)
- self.response.out.write(count)
- else:
- logging.info("querying datastore for LivecountCounters for counter_name = " + str(counter_name) + ", namespace = " + str(namespace))
+ modified_counter = None
+ if name:
+ full_key = LivecountCounter.KeyName(namespace, period_type, period, name)
+ modified_counter = LivecountCounter.get_by_key_name(full_key)
+
counter_entities_query = LivecountCounter.all().order('-count')
- if counter_name:
- counter_entities_query.filter("counter_name = ", counter_name)
if namespace:
counter_entities_query.filter("namespace = ", namespace)
+ if period_type:
+ counter_entities_query.filter("period_type = ", period_type)
+ if period:
+ counter_entities_query.filter("period = ", period)
counter_entities = counter_entities_query.fetch(int(fetch_limit))
logging.info("counter_entities: " + str(counter_entities))
+
+ stats = counter.GetMemcacheStats()
+
+ template_values = {
+ 'namespace': namespace,
+ 'period_type': period_type,
+ 'period_types': period_types,
+ 'period': period,
+ 'counter_name': name,
+ 'delta': delta,
+ 'modified_counter': modified_counter,
+ 'counters': counter_entities,
+ 'stats': stats
+ }
+ logging.info("template_values: " + str(template_values))
+ template_file = os.path.join(os.path.dirname(__file__), 'counter_admin.html')
+ self.response.out.write(template.render(template_file, template_values))
- counters = []
- for entity in counter_entities:
- counter_data = {'key': str(entity.key().name()),
- 'count': str(entity.count)}
- counters.append(counter_data)
- json_counters_data = simplejson.dumps(counters)
-
- if json_counters_data:
- self.response.set_status(200)
- self.response.out.write(json_counters_data)
- return
+
+ def post(self):
+ namespace = self.request.get('namespace')
+ period_type = self.request.get('period_type')
+ period_types = self.request.get('period_types').replace(" ", "")
+ period = self.request.get('period')
+ name = self.request.get('counter_name')
+ delta = self.request.get('delta')
+ type = self.request.get('type')
+
+ if type == "Increment Counter":
+ counter.load_and_increment_counter(name, period, period_types.split(","), long(delta), namespace=namespace)
+ elif type == "Decrement Counter":
+ counter.load_and_decrement_counter(name, period, period_types.split(","), long(delta), namespace=namespace)
+
+ logging.info("Redirecting to: /livecount/counter_admin?namespace=" + namespace + "&period_type=" + period_type + "&period_types=" + period_types + "&period=" + period + "&counter_name=" + name + "&delta=" + delta)
+ self.redirect("/livecount/counter_admin?namespace=" + namespace + "&period_type=" + period_type + "&period_types=" + period_types + "&period=" + period + "&counter_name=" + name + "&delta=" + delta)
def main():
- application = webapp.WSGIApplication(
- [
- ('/livecount/counter_admin', CounterHandler),
- ('/livecount/get_counter', GetCounterHandler),
- ], debug=True)
- wsgiref.handlers.CGIHandler().run(application)
+ application = webapp.WSGIApplication(
+ [
+ ('/livecount/counter_admin', CounterHandler),
+ ], debug=True)
+ wsgiref.handlers.CGIHandler().run(application)
if __name__ == '__main__':
- main()
+ main()
Please sign in to comment.
Something went wrong with that request. Please try again.