Skip to content

Commit

Permalink
Added LockManager to batteries
Browse files Browse the repository at this point in the history
  • Loading branch information
bakwc committed Jan 7, 2017
1 parent 75ef90d commit db51e65
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 1 deletion.
95 changes: 95 additions & 0 deletions pysyncobj/batteries.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
import threading
import weakref
import time
import socket
import os
from syncobj import SyncObjConsumer, replicated


Expand Down Expand Up @@ -188,3 +193,93 @@ def __len__(self):

def __contains__(self, item):
return item in self.__data


class _LockManagerImpl(SyncObjConsumer):
def __init__(self, autoUnlockTime):
super(_LockManagerImpl, self).__init__()
self.__locks = {}
self.__autoUnlockTime = autoUnlockTime

@replicated
def acquire(self, lockPath, clientID, currentTime):
existingLock = self.__locks.get(lockPath, None)
# Auto-unlock old lock
if existingLock is not None:
if currentTime - existingLock[1] > self.__autoUnlockTime:
existingLock = None
# Acquire lock if possible
if existingLock is None or existingLock[0] == clientID:
self.__locks[lockPath] = (clientID, currentTime)
return True
# Lock already acquired by someone else
return False

@replicated
def prolongate(self, clientID, currentTime):
for lockPath in self.__locks.keys():
lockClientID, lockTime = self.__locks[lockPath]

if currentTime - lockTime > self.__autoUnlockTime:
del self.__locks[lockPath]
continue

if lockClientID == clientID:
self.__locks[lockPath] = (clientID, currentTime)

@replicated
def release(self, lockPath, clientID):
existingLock = self.__locks.get(lockPath, None)
if existingLock is not None and existingLock[0] == clientID:
del self.__locks[lockPath]

def isAcquired(self, lockPath, clientID, currentTime):
existingLock = self.__locks.get(lockPath, None)
if existingLock is not None:
if existingLock[0] == clientID:
if currentTime - existingLock[1] < self.__autoUnlockTime:
return True
return False


class LockManager(object):

def __init__(self, autoUnlockTime, selfID = None):
self.__lockImpl = _LockManagerImpl(autoUnlockTime)
if selfID is None:
selfID = '%s:%d:%d' % (socket.gethostname(), os.getpid(), id(self))
self.__selfID = selfID
self.__autoUnlockTime = autoUnlockTime
self.__mainThread = threading.current_thread()
self.__initialised = threading.Event()
self.__thread = threading.Thread(target=LockManager._autoAcquireThread, args=(weakref.proxy(self),))
self.__thread.start()
while not self.__initialised.is_set():
pass

def _consumer(self):
return self.__lockImpl

def _autoAcquireThread(self):
self.__initialised.set()
try:
while True:
if not self.__mainThread.is_alive():
break
time.sleep(float(self.__autoUnlockTime) / 4.0)
syncObj = self.__lockImpl._syncObj
if syncObj is None:
continue
if syncObj._getLeader() is not None:
self.__lockImpl.prolongate(self.__selfID, time.time())
except ReferenceError:
pass

def tryAcquireLock(self, path, callback=None, sync=False, timeout=None):
return self.__lockImpl.acquire(path, self.__selfID, time.time(), callback=callback, sync=sync, timeout=timeout)

def isAcquired(self, path):
return self.__lockImpl.isAcquired(path, self.__selfID, time.time())

def release(self, path, callback=None, sync=False, timeout=None):
self.__lockImpl.release(path, self.__selfID, callback=callback, sync=sync, timeout=timeout)
2 changes: 1 addition & 1 deletion pysyncobj/revision.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
REVISION = '32fc67cf0feee8c9578fe7e0e1ac592c280d2523'
REVISION = '75ef90d24fc9e6ce6ff2a8b436209a87d1d510bb'
9 changes: 9 additions & 0 deletions pysyncobj/syncobj.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,15 @@ def __init__(self, selfNodeAddr, otherNodesAddrs, conf=None, consumers=None):
self.__encryptor = None

consumers = consumers or []
newConsumers = []
for c in consumers:
if not isinstance(c, SyncObjConsumer) and getattr(c, '_consumer', None):
c = c._consumer()
if not isinstance(c, SyncObjConsumer):
raise SyncObjException('Consumers must be inherited from SyncObjConsumer')
newConsumers.append(c)
consumers = newConsumers

self.__consumers = consumers

self.__selfNodeAddr = selfNodeAddr
Expand Down

0 comments on commit db51e65

Please sign in to comment.