Skip to content

Commit

Permalink
release: version 0.0.1-GA published
Browse files Browse the repository at this point in the history
  • Loading branch information
Alpha2J committed Aug 6, 2022
1 parent f0dc100 commit 26004f6
Show file tree
Hide file tree
Showing 15 changed files with 184 additions and 87 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

3. implemented:


4. test schedule:

5. what problems have I meet
- how to test private method: [sloved](https://stackoverflow.com/questions/34571/how-do-i-test-a-class-that-has-private-methods-fields-or-inner-classes)
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>site.hellooo</groupId>
<artifactId>hellooo-distributedlock</artifactId>
<version>1.0-SNAPSHOT</version>
<version>0.0.1-GA</version>

<properties>
<maven.compiler.source>8</maven.compiler.source>
Expand Down
3 changes: 0 additions & 3 deletions src/main/java/site/hellooo/distributedlock/LockCallback.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ public interface LockCallback {
// execute immediately after the lock granted to current thread
void afterLocked(LockContext lockContext);

// execute immediately after the current thread queued, or say: after added to the 'tail.next'
void afterQueued(LockContext lockContext);

// execute before parking the current thread
void beforeParking(LockContext lockContext);

Expand Down
2 changes: 2 additions & 0 deletions src/main/java/site/hellooo/distributedlock/LockContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,6 @@ public interface LockContext {
LockHandler lockHandler();

LockCallback lockCallback();

DistributedLock currentLock();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ public static void check(boolean predicate, String message, Object... values) {
}
}

public static <T> void checkNotNull(T ref, String message) {
if (ref == null) {
throw new NullPointerException(stringValue(message));
}
}

private static String stringValue(String message) {
return message == null ? StringUtils.empty() : message;
}
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/site/hellooo/distributedlock/common/ClassUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package site.hellooo.distributedlock.common;

public class ClassUtils {
public static <O> String getObjClassName(O object) {
if (object == null) {
return "null";
} else {
Class<?> clazz = object.getClass();
return clazz.getName();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,11 @@ public static class LockOptionsBuilder {
private static final String DEFAULT_IDENTIFIER_PREFIX = "";
private static final String DEFAULT_IDENTIFIER_SUFFIX = "";

// 默认20毫秒重试一次
// 默认20毫秒重试一次
private static final long DEFAULT_RETRY_INTERVAL_MILLISECONDS = 10L;
// 默认1秒续期一次
// 默认1秒续期一次
private static final long DEFAULT_LEASE_INTERVAL_MILLISECONDS = 1000L;
// 默认续期一次增加5秒 (todo 记得处理:过期事件小于1秒的时候才续期)
// 默认每次续期都将过期时间设置为5秒
private static final long DEFAULT_LEASE_MILLISECONDS = 5000L;

private static final Coordinator DEFAULT_COORDINATOR = Coordinator.REDIS_SINGLETON;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,12 @@ public LockStateNotRemovedException() {
public LockStateNotRemovedException(String message) {
super(message);
}

public LockStateNotRemovedException(Throwable cause) {
super(cause);
}

public LockStateNotRemovedException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package site.hellooo.distributedlock.impl.redis;
package site.hellooo.distributedlock.impl;

import site.hellooo.distributedlock.LockCallback;
import site.hellooo.distributedlock.LockContext;
import site.hellooo.distributedlock.enums.Coordinator;
import site.hellooo.distributedlock.impl.redis.RedisLockCallback;

public class LockCallbackFactory {
public static LockCallback of(Coordinator coordinator, LockContext lockContext) {
Expand All @@ -11,6 +12,6 @@ public static LockCallback of(Coordinator coordinator, LockContext lockContext)
return new RedisLockCallback(lockContext);
}

throw new UnsupportedOperationException("Fatal: coordinator with type '" + coordinator.getName() + "' is not implemented yet!");
throw new UnsupportedOperationException("Fatal: LockCallback with coordinator type [" + coordinator.getName() + "] is not implemented yet!!!");
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
package site.hellooo.distributedlock.impl;

import site.hellooo.distributedlock.LockCallback;
import site.hellooo.distributedlock.LockContext;
import site.hellooo.distributedlock.LockHandler;
import site.hellooo.distributedlock.LockState;
import site.hellooo.distributedlock.*;
import site.hellooo.distributedlock.config.LockOptions;
import site.hellooo.distributedlock.enums.Coordinator;
import site.hellooo.distributedlock.enums.LockType;
import site.hellooo.distributedlock.exception.GenericRuntimeLockException;
import site.hellooo.distributedlock.exception.LockStateNotRemovedException;
import site.hellooo.distributedlock.exception.LockStateNotSetException;
import site.hellooo.distributedlock.impl.redis.LockCallbackFactory;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -34,8 +30,8 @@ public ReentrantDistributedLock(LockOptions lockOptions, String lockTarget, Lock
this.lockHandler = lockHandler;

lockContext = new LockContext() {
private AtomicReference<Thread> holdingThread = new AtomicReference<>();
private AtomicReference<LockState<?>> holdingLockState = new AtomicReference<>();
private final AtomicReference<Thread> holdingThread = new AtomicReference<>();
private final AtomicReference<LockState<?>> holdingLockState = new AtomicReference<>();

@Override
public String lockTarget() {
Expand Down Expand Up @@ -64,20 +60,22 @@ public LockHandler lockHandler() {

@Override
public LockCallback lockCallback() {
// if (lockCallback == null) {
// lockCallback = LockCallbackFactory.of(lockOptions.getCoordinator(), this);
// }
return lockCallback;
}

@Override
public DistributedLock currentLock() {
return ReentrantDistributedLock.this;
}
};
lockCallback = LockCallbackFactory.of(lockOptions.getCoordinator(), lockContext);
}

private Node addWaiter() {

Node currentNode = new Node(Thread.currentThread());
final Node currentNode = new Node(Thread.currentThread());

Node prev = tail.get();
final Node prev = tail.get();
if (prev != null) {
if (tail.compareAndSet(prev, currentNode)) {
prev.next.set(currentNode);
Expand All @@ -90,11 +88,11 @@ private Node addWaiter() {
return currentNode;
}

// enqueue node and return the prev node
// enqueue node and return the prev node
private Node enqueue(final Node node) {

while (true) {
Node prev = tail.get();
final Node prev = tail.get();
if (prev == null) {
Node newHead = new Node(null);
newHead.next.set(node);
Expand All @@ -117,7 +115,7 @@ private void acquireQueued(final Node node) {

while (true) {
final Node prev = node.prev.get();
// only if prev node is head, ant then we try to get lock
// only if prev node is head, and then we try to get the lock
if (prev == head.get() && tryLock()) {
// if tryLock success, it means that the prev node has release the lock,
// so we need to remove it from the queue
Expand All @@ -133,7 +131,7 @@ private void acquireQueued(final Node node) {
}
}

private void unparkQueueHead() {
public void unparkQueueHead() {
Node headNode = head.get();
if (headNode != null && headNode.next.get() != null) {
Thread thread = headNode.next.get().thread;
Expand Down Expand Up @@ -191,20 +189,25 @@ public void unlock() {

AtomicReference<Thread> holdingThreadReference = lockContext.holdingThread();
Thread holdingThread = holdingThreadReference.get();
if (holdingThread != null && Thread.currentThread() != holdingThread) {
throw new GenericRuntimeLockException("Fatal: different thread between lock and unlock, PLEASE CHECK!!!");
if (holdingThread == null || holdingThread != Thread.currentThread()) {
String causeMessage = holdingThread == null ?
"Fatal: holdingThread is null, when does lock released? PLEASE CHECK!!!"
:
"Fatal: different thread between lock and unlock, PLEASE CHECK!!!";
throw new GenericRuntimeLockException(causeMessage);
}

if (this.holdingCount.decrementAndGet() > 0) {
return;
}

AtomicReference<LockState<?>> lockStateReference = lockContext.holdingLockState();
if (lockStateReference.get() == null) {
throw new GenericRuntimeLockException("Fatal: context not holding a lockState, PLEASE CHECK!!!");
AtomicReference<LockState<?>> holdingLockStateReference = lockContext.holdingLockState();
LockState<?> holdingLockState = holdingLockStateReference.get();
if (holdingLockState == null) {
throw new GenericRuntimeLockException("Fatal: holdingLockState is null, when does it removed? PLEASE CHECK!!!");
}
try {
lockHandler.removeState(lockStateReference.get(), lockContext);
lockHandler.removeState(holdingLockState, lockContext);
lockCallback.afterUnlocked(lockContext);
} catch (LockStateNotRemovedException ignored) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@

import site.hellooo.distributedlock.LockContext;

public abstract class AbstractRemotingThread extends Thread {
// this class is only design for redis business processing
abstract class AbstractRemotingThread extends Thread {

final Object synchronizer = new Object();
private final Object synchronizer = new Object();
private boolean shutdown = false;

protected final LockContext lockContext;

public AbstractRemotingThread(LockContext lockContext) {
protected AbstractRemotingThread(LockContext lockContext) {
setDaemon(true);
this.lockContext = lockContext;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,16 @@

public class RedisLockCallback implements LockCallback {

private AtomicReference<Thread> leaseThreadReference = new AtomicReference<>();
private AtomicReference<Thread> retryLockThreadReference = new AtomicReference<>();
/**
* this thread is responsible for lease the redis key expire time
*/
private final AtomicReference<Thread> leaseThreadReference = new AtomicReference<>();

/**
* this thread is responsible for communicating with the coordinator to
* tryLock endlessly when no thread of current process hold the lock
*/
private final AtomicReference<Thread> retryLockThreadReference = new AtomicReference<>();

private LockContext lockContext;

Expand All @@ -25,10 +33,10 @@ private void startRetryLockThread() {
while (retryLockThread == null || retryLockThread.getState() == Thread.State.TERMINATED) {
if (retryLockThreadReference.compareAndSet(retryLockThread, new RemotingRetryLockThread(lockContext))) {
retryLockThread = retryLockThreadReference.get();
// one process have only one running retryLockThread
retryLockThread.start();
}
}

retryLockThread.start();
}

private void shutdownRetryLockThread() {
Expand Down Expand Up @@ -63,18 +71,13 @@ public void afterLocked(LockContext lockContext) {
}

@Override
public void afterQueued(LockContext lockContext) {
public void beforeParking(LockContext lockContext) {
// if current process not holding the lock, then we should start the retry lock thread
if (lockContext.holdingThread().get() == null) {
startRetryLockThread();
}
}

@Override
public void beforeParking(LockContext lockContext) {
startRetryLockThread();
}

@Override
public void afterUnlocked(LockContext lockContext) {
shutdownLeaseThread();
Expand Down
Loading

0 comments on commit 26004f6

Please sign in to comment.