Skip to content

Commit

Permalink
Merge 3f3172e into e4d3d12
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhengshuaiPENG committed Jul 23, 2019
2 parents e4d3d12 + 3f3172e commit 12acefc
Show file tree
Hide file tree
Showing 10 changed files with 730 additions and 33 deletions.
6 changes: 6 additions & 0 deletions core-job/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@
<artifactId>curator-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,19 @@
import org.apache.kylin.common.lock.DistributedLockFactory;
import org.apache.kylin.common.util.ZKUtil;
import org.apache.kylin.job.lock.JobLock;
import org.apache.kylin.job.lock.zookeeper.exception.ZkAcquireLockException;
import org.apache.kylin.job.lock.zookeeper.exception.ZkPeekLockException;
import org.apache.kylin.job.lock.zookeeper.exception.ZkPeekLockInterruptException;
import org.apache.kylin.job.lock.zookeeper.exception.ZkReleaseLockException;
import org.apache.kylin.job.lock.zookeeper.exception.ZkReleaseLockInterruptException;
import org.apache.kylin.job.util.ThrowableUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;

/**
* A distributed lock based on zookeeper. Every instance is owned by a client, on whose behalf locks are acquired and/or released.
*
Expand Down Expand Up @@ -97,21 +105,41 @@ public boolean lock(String lockPath) {
curator = ZKUtil.getZookeeperClient(KylinConfig.getInstanceFromEnv());
}

lockInternal(lockPath);

String lockOwner;
try {
lockOwner = peekLock(lockPath);
if (client.equals(lockOwner)) {
logger.info("{} acquired lock at {}", client, lockPath);
return true;
} else {
logger.debug("{} failed to acquire lock at {}, which is held by {}", client, lockPath, lockOwner);
return false;
}
} catch (ZkPeekLockInterruptException zpie) {
logger.error("{} peek owner of lock interrupt while acquire lock at {}, check to release lock", client,
lockPath);
lockOwner = peekLock(lockPath);

try {
unlockInternal(lockOwner, lockPath);
} catch (Exception anyEx) {
// it's safe to swallow any exception here because here already been interrupted
logger.warn("Exception caught to release lock when lock operation has been interrupted.", anyEx);
}
throw zpie;
}
}

private void lockInternal(String lockPath) {
try {
curator.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(lockPath, clientBytes);
} catch (KeeperException.NodeExistsException ex) {
logger.debug("{} see {} is already locked", client, lockPath);
} catch (Exception ex) {
throw new IllegalStateException("Error while " + client + " trying to lock " + lockPath, ex);
}

String lockOwner = peekLock(lockPath);
if (client.equals(lockOwner)) {
logger.info("{} acquired lock at {}", client, lockPath);
return true;
} else {
logger.debug("{} failed to acquire lock at {}, which is held by {}", client, lockPath, lockOwner);
return false;
// don't need to catch interrupt exception when locking, it's safe to throw the exception directly
throw new ZkAcquireLockException("Error occurs while " + client + " trying to lock " + lockPath, ex);
}
}

Expand All @@ -136,7 +164,8 @@ public boolean lock(String lockPath, long timeout) {
}

if (lock(lockPath)) {
logger.debug("{} waited {} ms for lock path {}", client, (System.currentTimeMillis() - waitStart), lockPath);
logger.debug("{} waited {} ms for lock path {}", client, (System.currentTimeMillis() - waitStart),
lockPath);
return true;
}
}
Expand All @@ -147,13 +176,23 @@ public boolean lock(String lockPath, long timeout) {

@Override
public String peekLock(String lockPath) {
try {
return peekLockInternal(lockPath);
} catch (Exception ex) {
if (ThrowableUtils.isInterruptedException(ex)) {
throw new ZkPeekLockInterruptException("Peeking owner of lock was interrupted at" + lockPath, ex);
} else {
throw new ZkPeekLockException("Error while peeking at " + lockPath, ex);
}
}
}

private String peekLockInternal(String lockPath) throws Exception {
try {
byte[] bytes = curator.getData().forPath(lockPath);
return new String(bytes, StandardCharsets.UTF_8);
} catch (KeeperException.NoNodeException ex) {
return null;
} catch (Exception ex) {
throw new IllegalStateException("Error while peeking at " + lockPath, ex);
}
}

Expand All @@ -171,31 +210,83 @@ public boolean isLockedByMe(String lockPath) {
public void unlock(String lockPath) {
logger.debug("{} trying to unlock {}", client, lockPath);

String owner = peekLock(lockPath);
if (owner == null)
throw new IllegalStateException(client + " cannot unlock path " + lockPath + " which is not locked currently");
if (client.equals(owner) == false)
throw new IllegalStateException(client + " cannot unlock path " + lockPath + " which is locked by " + owner);

// peek owner first
String owner;
ZkPeekLockInterruptException peekLockInterruptException = null;
try {
curator.delete().guaranteed().deletingChildrenIfNeeded().forPath(lockPath);

logger.info("{} released lock at {}", client, lockPath);
owner = peekLock(lockPath);
} catch (ZkPeekLockInterruptException zie) {
// re-peek owner of lock when interrupted
owner = peekLock(lockPath);
peekLockInterruptException = zie;
} catch (ZkPeekLockException ze) {
// this exception should be thrown to diagnose even it may cause unlock failed
logger.error("{} failed to peekLock when unlock at {}", client, lockPath, ze);
throw ze;
}

// then unlock
ZkReleaseLockInterruptException unlockInterruptException = null;
try {
unlockInternal(owner, lockPath);
} catch (ZkReleaseLockInterruptException zlie) {
// re-unlock once when interrupted
unlockInternal(owner, lockPath);
unlockInterruptException = zlie;
} catch (Exception ex) {
throw new IllegalStateException("Error while " + client + " trying to unlock " + lockPath, ex);
throw new ZkReleaseLockException("Error while " + client + " trying to unlock " + lockPath, ex);
}

// need re-throw interrupt exception to avoid swallowing it
if (peekLockInterruptException != null) {
throw peekLockInterruptException;
}
if (unlockInterruptException != null) {
throw unlockInterruptException;
}
}

/**
* May throw ZkReleaseLockException or ZkReleaseLockInterruptException
*/
private void unlockInternal(String owner, String lockPath) {
// only unlock the lock belongs itself
if (owner == null)
throw new IllegalStateException(
client + " cannot unlock path " + lockPath + " which is not locked currently");
if (!client.equals(owner))
throw new IllegalStateException(
client + " cannot unlock path " + lockPath + " which is locked by " + owner);
purgeLockInternal(lockPath);
logger.info("{} released lock at {}", client, lockPath);
}

@Override
public void purgeLocks(String lockPathRoot) {
try {
curator.delete().guaranteed().deletingChildrenIfNeeded().forPath(lockPathRoot);

purgeLockInternal(lockPathRoot);
logger.info("{} purged all locks under {}", client, lockPathRoot);
} catch (ZkReleaseLockException zpe) {
throw zpe;
} catch (ZkReleaseLockInterruptException zpie) {
// re-purge lock once when interrupted
purgeLockInternal(lockPathRoot);
throw zpie;
}
}

} catch (Exception ex) {
throw new IllegalStateException("Error while " + client + " trying to purge " + lockPathRoot, ex);
@VisibleForTesting
void purgeLockInternal(String lockPath) {
try {
curator.delete().guaranteed().deletingChildrenIfNeeded().forPath(lockPath);
} catch (KeeperException.NoNodeException ex) {
// it's safe to purge a lock when there is no node found in lockPath
logger.warn("No node found when purge lock in Lock path: {}", lockPath, ex);
} catch (Exception e) {
if (ThrowableUtils.isInterruptedException(e))
throw new ZkReleaseLockInterruptException("Purge lock was interrupted at " + lockPath, e);
else
throw new ZkReleaseLockException("Error while " + client + " trying to purge " + lockPath, e);
}
}

Expand All @@ -209,10 +300,12 @@ public Closeable watchLocks(String lockPathRoot, Executor executor, final Watche
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
switch (event.getType()) {
case CHILD_ADDED:
watcher.onLock(event.getData().getPath(), new String(event.getData().getData(), StandardCharsets.UTF_8));
watcher.onLock(event.getData().getPath(),
new String(event.getData().getData(), StandardCharsets.UTF_8));
break;
case CHILD_REMOVED:
watcher.onUnlock(event.getData().getPath(), new String(event.getData().getData(), StandardCharsets.UTF_8));
watcher.onUnlock(event.getData().getPath(),
new String(event.getData().getData(), StandardCharsets.UTF_8));
break;
default:
break;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kylin.job.lock.zookeeper.exception;

public class ZkAcquireLockException extends RuntimeException {
public ZkAcquireLockException() {
}

public ZkAcquireLockException(String message) {
super(message);
}

public ZkAcquireLockException(String message, Throwable cause) {
super(message, cause);
}

public ZkAcquireLockException(Throwable cause) {
super(cause);
}

public ZkAcquireLockException(String message, Throwable cause, boolean enableSuppression,
boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kylin.job.lock.zookeeper.exception;

public class ZkPeekLockException extends RuntimeException {
public ZkPeekLockException() {
}

public ZkPeekLockException(String message) {
super(message);
}

public ZkPeekLockException(String message, Throwable cause) {
super(message, cause);
}

public ZkPeekLockException(Throwable cause) {
super(cause);
}

public ZkPeekLockException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kylin.job.lock.zookeeper.exception;

public class ZkPeekLockInterruptException extends RuntimeException {
public ZkPeekLockInterruptException() {
}

public ZkPeekLockInterruptException(String message) {
super(message);
}

public ZkPeekLockInterruptException(String message, Throwable cause) {
super(message, cause);
}

public ZkPeekLockInterruptException(Throwable cause) {
super(cause);
}

public ZkPeekLockInterruptException(String message, Throwable cause, boolean enableSuppression,
boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kylin.job.lock.zookeeper.exception;

public class ZkReleaseLockException extends RuntimeException {
public ZkReleaseLockException() {
}

public ZkReleaseLockException(String message) {
super(message);
}

public ZkReleaseLockException(String message, Throwable cause) {
super(message, cause);
}

public ZkReleaseLockException(Throwable cause) {
super(cause);
}

public ZkReleaseLockException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

0 comments on commit 12acefc

Please sign in to comment.