Skip to content

Commit

Permalink
#302 Update lock logic
Browse files Browse the repository at this point in the history
  • Loading branch information
dvoraka committed Jan 13, 2018
1 parent 96f6067 commit 90f1f86
Showing 1 changed file with 69 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,49 +105,84 @@ public void stop() {
responseClient.removeNoResponseMessageListener(this);
}

private void initializeSafe() {
try {
initialize();
} catch (Exception e) {
log.error("Initialization failed!", e);
}
}

/**
* Initializes the lock before start.
*/
private void initialize() {
waitUntil(responseClient::isRunning);
initializeSequence();
}

@Override
public boolean lockForFile(String filename, String owner, int remoteLockCount)
throws InterruptedException {
public boolean lockForFile(String filename, String owner, int remoteLockCount) {

if (isIsolated() && remoteLockCount > 0) {
return false;
}

// lock local file
if (!lockFile(filename, owner)) {
if (!lockLocalFile(filename, owner)) {
return false;
}

// remote locking
log.debug("Locking {} nodes {}...", remoteLockCount, idString);
//TODO: unlocking looks unsafe
lockingLock.lockInterruptibly();
if (isIsolated() && remoteLockCount == 0) {
return true;
}

final int retryCount = 2;
for (int i = 0; i <= retryCount; i++) {
if (remoteLockForFile(filename, owner, remoteLockCount)) {
return true;
}

String id = sendLockRequest(filename, owner);
final long successLocks = getLockResponse(id, remoteLockCount);
try {
unlockLocalFile(filename, owner);
} catch (FileNotLockedException e) {
log.warn(UNLOCKING_FAILED, e);
}

if (successLocks == remoteLockCount) {
incSequence();
log.debug("Remote locking success {}.", idString);
lockingLock.unlock();
return false;
}

return true;
} else if (successLocks > (remoteLockCount / 2)) {
sendForceUnlockRequest(filename, owner);
} else {
break;
}
private boolean remoteLockForFile(String filename, String owner, int remoteLockCount) {
log.debug("Locking {} nodes {}...", remoteLockCount, idString);

try {
lockingLock.lockInterruptibly();
} catch (InterruptedException e) {
log.warn("Remote locking interrupted {}.", idString);
Thread.currentThread().interrupt();

return false;
}

log.warn("Remote locking failed {}.", idString);
lockingLock.unlock();
try {
unlockFile(filename, owner);
} catch (FileNotLockedException e) {
log.warn(UNLOCKING_FAILED, e);
final int retryCount = 2;
for (int i = 0; i <= retryCount; i++) {

String id = sendLockRequest(filename, owner);
final long successLocks = getLockResponse(id, remoteLockCount);

if (successLocks == remoteLockCount) {

incSequence();
log.debug("Remote locking success {}.", idString);

return true;

} else if (successLocks > (remoteLockCount / 2)) {
sendForceUnlockRequest(filename, owner);
} else {
break;
}
}
} finally {
lockingLock.unlock();
}

return false;
Expand Down Expand Up @@ -195,7 +230,7 @@ public boolean unlockForFile(String filename, String owner, int lockCount) {
boolean remoteSuccess = (successUnlocks == lockCount);

try {
unlockFile(filename, owner);
unlockLocalFile(filename, owner);
} catch (FileNotLockedException e) {
log.warn("Local file was not locked " + idString + "!", e);

Expand All @@ -210,22 +245,6 @@ public void networkChanged() {
// updateSequence();
}

private void initializeSafe() {
try {
initialize();
} catch (Exception e) {
log.error("Initialization failed!", e);
}
}

/**
* Initializes the lock before start.
*/
private void initialize() {
waitUntil(responseClient::isRunning);
initializeSequence();
}

/**
* Initializes sequence before lock start.
*/
Expand Down Expand Up @@ -300,7 +319,7 @@ private long getSequence() {
}

private void setSequence(long sequence) {
log.debug("Setting sequence {}: {}", idString, sequence);
log.info("Setting sequence {}: {}", idString, sequence);
this.sequence.set(sequence);
}

Expand All @@ -312,7 +331,7 @@ private boolean isFileLocked(String filename, String owner) {
return lockedFiles.contains(hash(filename, owner));
}

private boolean lockFile(String filename, String owner) {
private boolean lockLocalFile(String filename, String owner) {
log.debug("Locking {}: {}, {}", idString, filename, owner);

synchronized (lockedFiles) {
Expand All @@ -330,7 +349,7 @@ private boolean lockFile(String filename, String owner) {
}
}

private void unlockFile(String filename, String owner) throws FileNotLockedException {
private void unlockLocalFile(String filename, String owner) throws FileNotLockedException {
log.debug("Unlocking {}: {}, {}", idString, filename, owner);

synchronized (lockedFiles) {
Expand Down Expand Up @@ -405,7 +424,7 @@ private void sequence(ReplicationMessage message) {
private void lock(ReplicationMessage message) {
if (getSequence() == message.getSequence() && lockingLock.tryLock()) {

if (lockFile(message.getFilename(), message.getOwner())) {
if (lockLocalFile(message.getFilename(), message.getOwner())) {
incSequence();
lockingLock.unlock();
serviceClient.sendMessage(createLockSuccessReply(message, nodeId));
Expand All @@ -421,7 +440,7 @@ private void lock(ReplicationMessage message) {

private void unlock(ReplicationMessage message) {
try {
unlockFile(message.getFilename(), message.getOwner());
unlockLocalFile(message.getFilename(), message.getOwner());
serviceClient.sendMessage(createUnlockSuccessReply(message, nodeId));
} catch (FileNotLockedException e) {
log.warn("Unlocking failed " + idString + ".", e);
Expand All @@ -434,7 +453,7 @@ private void forceUnlock(ReplicationMessage message) {
log.warn("Force unlock {}: {}, {}",
idString, message.getFilename(), message.getOwner());
try {
unlockFile(message.getFilename(), message.getOwner());
unlockLocalFile(message.getFilename(), message.getOwner());
} catch (FileNotLockedException e) {
log.warn("Force unlock failed " + idString + ".", e);
}
Expand Down

0 comments on commit 90f1f86

Please sign in to comment.