Permalink
Browse files

Closes #11: Create lock(), acquire_lock() and release_lock() methods.

This patch creates a nd_service_registry.Lock object that can be used in
two ways. In the first method, you can use the object in a 'with' block
-- explicitly acquiring the Lock only while your 'with' block is
running.

The second model allows you to call acquire_lock() directly to lock a
path, but relies on you actively calling release_lock() (or your
connection expiring) before the lock will be free again.

At this time it is not possible to add a 'wait' timer in on the Locks
because of the way Kazoo is written. I'll work with the developers to
understand the behavior and see if we can add that model in later.

Change-Id: I4d361ee5a25236fb9fcb3168a310c9184312a961
  • Loading branch information...
1 parent e84cf3f commit ab271a1d4135e6b1eec623b94b56cb3c817016c5 @diranged diranged committed Mar 15, 2013
Showing with 229 additions and 31 deletions.
  1. +24 −3 README.rst
  2. +116 −28 nd_service_registry/__init__.py
  3. +89 −0 nd_service_registry/lock.py
View
@@ -47,7 +47,7 @@ To create your initial connection object::
>>> from nd_service_registry import KazooServiceRegistry
>>> nd = KazooServiceRegistry()
-The KazooServiceRegistry object is a child of nd_service_registry that conforms
+The KazooServiceRegistry object is a child of nd_service_registry that conforms
to our ServiceRegistry specs, whlie leveraging Kazoo as the backend. The
object handles all of your connection states - there is no need to start/stop
or monitor the connection state at all.
@@ -72,6 +72,27 @@ Getting a list of servers at a path::
aversion=0, ephemeralOwner=0, dataLength=0,
numChildren=1, pzxid=7)}
+
+Locks
+-----
+
+One of Zookeepers great features is using it as a global lock manager. We provide
+two models for getting a lock. In one model, your lock is only active as long as
+your code is running::
+
+ >>> with nd.get_lock('/foo', simultaneous=1):
+ ... <do some work>
+ ...
+ >>>
+
+Another example is explicitly locking a path for some period of time, then
+releasing it explicitly (eg, locking during one method, and waiting for an
+entirely different method to handle the unlock)::
+
+ >>> nd.acquire_lock('/foo', simultaneous=1)
+ >>> <do your work... >
+ >>> nd.release_lock('/foo')
+
Django use
----------
@@ -116,7 +137,7 @@ Example use in your code::
>>> from nextdoor import service_registry_utils
>>> def do_something(data):
... print "New server data: %s" % data
- ...
+ ...
>>> service_registry_utils.get('/services/staging/uswest2/memcache',
... callback=do_something)
New server data: { 'path': '/services/staging/uswest2/memcache',
@@ -144,7 +165,7 @@ Warning: LC_ALL and LANG settings
Running the Django shell::
- # unset LC_ALL; LANG=en_US:UTF-8 python manage.py shell
+ # unset LC_ALL; LANG=en_US:UTF-8 python manage.py shell
Connection Handling
@@ -85,42 +85,35 @@
u'created': u'2012-12-12 15:26:24'}
}
+Example of doing some work with a lock in place:
+
+ >>> with nd.get_lock('/myjob'):
+ ... print "I got a lock!"
+ ...
+ I got a lock!
+
Copyright 2012 Nextdoor Inc.
"""
__author__ = 'matt@nextdoor.com (Matt Wise)'
-# For nd_service_registry Class
-import os
-import logging
-import exceptions
-import atexit
-
-from os.path import split
from functools import wraps
+import atexit
+import logging
+import os
-# Our own classes
-from nd_service_registry.registration import EphemeralNode
-from nd_service_registry.watcher import Watcher
-from nd_service_registry.watcher import DummyWatcher
-from nd_service_registry import funcs
-from nd_service_registry import exceptions
-
-# For KazooServiceRegistry Class
-import kazoo.security
-from nd_service_registry.shims import ZookeeperClient
from kazoo.client import KazooState
-
-# Use Gevent as our threader (DOES NOT WORK RIGHT NOW)
-#from kazoo.handlers.gevent import SequentialGeventHandler as EventHandler
-#from gevent.event import Timeout as TimeoutException
-
-# Use standard threading library as our threader
from kazoo.handlers.threading import SequentialThreadingHandler as EventHandler
-from kazoo.handlers.threading import TimeoutError as TimeoutException
+import kazoo.security
-# Our default variables
-from version import __version__ as VERSION
+from nd_service_registry import shims
+from nd_service_registry import exceptions
+from nd_service_registry import funcs
+from nd_service_registry.lock import Lock
+from nd_service_registry.registration import EphemeralNode
+from nd_service_registry.shims import ZookeeperClient
+from nd_service_registry.watcher import DummyWatcher
+from nd_service_registry.watcher import Watcher
# Defaults
TIMEOUT = 5 # seconds
@@ -173,6 +166,66 @@ def get(self, path, callback=None):
"""Retrieves a Watcher.get() dict for a given path."""
raise NotImplementedError('Not implemented. Use one of my subclasses.')
+ def get_lock(self, path, name=None, simultaneous=1):
+ """Retrieves a lock from the supplied path.
+
+ Lock objects can be used either directly with their own methods, or as
+ a semaphore-style locking object. Eg:
+
+ >>> with lock_object:
+ >>> <do something>
+
+ args:
+ path: String representing the lock path
+ name: Optional string representing the name of the client
+ simultaneous: Optional integer indicating how many clients can
+ lock this path at once. (default: 1)
+ returns:
+ nd_service_registry.Lock object
+ """
+ return self._get_lock(path, name, simultaneous)
+
+ def acquire_lock(self, path, name=None, simultaneous=1, wait=0):
+ """Asynchronously starts a lock and holds it.
+
+ Retrieves a Lock object for a particular path and 'acquires' the lock.
+ It returns True if the lock is successful. If the lock path is busy,
+ waits until the the lock can be acquired.
+
+ args:
+ path: String representing the lock path
+ name: Optional string representing the name of the client
+ simultaneous: Optional integer indicating how many clients can lock
+ this path at once. (default: 1)
+ returns:
+ True: The lock has been acquired.
+ False: Could not acquire the lock.
+ """
+ lock = self._get_lock(path, name, simultaneous)
+ lock.acquire()
+ return lock.status()
+
+ def release_lock(self, path):
+ """Releases a lock.
+
+ Retrieves a Lock object for a particular path and releases the lock on
+ that path.
+
+ args:
+ path: String representing the lock path
+
+ returns:
+ True: The lock has been released
+ False: The lock is still in place
+ """
+ lock = self._get_lock(path)
+ lock.release()
+
+ if not lock.status():
+ return True
+ else:
+ return False
+
def username(self):
"""Returns self._username"""
return self._username
@@ -261,7 +314,7 @@ def _get_dummywatcher(self, path, callback=None):
log.debug('Getting cachefile data...')
try:
cache = funcs.load_dict(self._cachefile)
- except (IOError, EOFError), e:
+ except (IOError, EOFError):
raise exceptions.ServiceRegistryException(
'Unable to load cachefile.')
@@ -460,6 +513,9 @@ def __init__(self, server=SERVER, readonly=False, timeout=TIMEOUT,
# Store all of our Watcher objects here
self._watchers = {}
+ # Store our Lock objects here
+ self._locks = {}
+
# Create a local 'dict' that we'll use to store the results of our
# get_nodes/get_node_data calls.
self._cachefile = cachefile
@@ -590,7 +646,7 @@ def unset(self, node):
def _create_connection(self):
"""Create our 'zk' connection object.
- If the object does not exist, create it. If it does exist, just move on.
+ If the object does not exist, create it. If it does exist, move on.
"""
if self._zk:
@@ -832,3 +888,35 @@ def _get_watcher(self, path, callback=None):
# whatever our user has supplied
watcher.add_callback(self._save_watcher_to_dict)
return watcher
+
+ @_health_check
+ def _get_lock(self, path, name=None, simultaneous=1):
+ """Retrieves a lock semaphore-style object from the supplied path.
+
+ This method creates our Lock object and returns it. It is not meant
+ to be used directly though.
+
+ Args:
+ path: A string representing the path for the lock.
+ name: Optional string representing the server identifier.
+ simultaneous: Int representing how many concurrent locks can
+ occur on this path.
+ Returns:
+ nd_service_registry.Lock object
+ """
+
+ # Return the object from our cache, if it's there
+ log.debug('[%s] Checking for existing object...' % path)
+ if path in self._locks:
+ log.debug('Found [%s] in cache: %s' %
+ (path, str(self._locks[path])))
+ return self._locks[path]
+
+ # Go create a Lock object and store it
+ log.debug('[%s] Creating Lock object...' % path)
+ self._locks[path] = Lock(self._zk,
+ path,
+ name,
+ simultaneous)
+
+ return self._locks[path]
@@ -0,0 +1,89 @@
+# Copyright 2013 Nextdoor.com, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Kazoo Zookeeper Lock Object
+
+Copyright 2013 Nextdoor Inc."""
+
+__author__ = 'matt@nextdoor.com (Matt Wise)'
+
+import logging
+
+from kazoo import exceptions
+
+log = logging.getLogger(__name__)
+
+
+class Lock(object):
+ """Creates a Kazoo Lock object for a given path."""
+
+ def __init__(self, zk, path, name=None, simultaneous=1):
+ # Set our local variables
+ self._zk = zk
+ self._path = path
+ self._name = name
+ self._simultaneous = simultaneous
+
+ # Create the Lock object in Kazoo. This lock object is used later
+ # by our methods, but does not actively lock anything right away.
+ self._lock = self._zk.Semaphore(self._path, self._name, self._simultaneous)
+
+ def acquire(self):
+ """Returns the actual Lock object for direct use by a client.
+
+ Note: This is a blocking call that may never return
+
+ returns:
+ True: Lock was acquired
+ False: Lock was unable to be acquired
+ """
+
+ log.debug('[%s] Acquiring the lock... (waiting indefinitely)' %
+ self._path)
+ try:
+ return self._lock.acquire()
+ except exceptions.CancelledError:
+ # This means that the Lock was canceled somewhere manually,
+ # which can happen for a lot of reasons. Simplest thing here
+ # is to just return False
+ return False
+
+ def release(self):
+ """Request to release the Lock
+
+ returns:
+ True: Lock was acquired
+ False: Lock was unable to be acquired
+ """
+
+ log.debug('[%s] Releasing the lock...' % self._path)
+ return self._lock.release()
+
+ def status(self):
+ """Return lock status
+
+ returns:
+ True: Lock was acquired
+ False: Lock was unable to be acquired
+ """
+
+ return self._lock.is_acquired
+
+ def __enter__(self):
+ """Provides 'with <lock object>:' support"""
+ self.acquire()
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ """Provides 'with <lock object>:' support"""
+ self.release()

0 comments on commit ab271a1

Please sign in to comment.