Skip to content

Commit

Permalink
add instance node
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Mar 29, 2017
1 parent 1e28e1d commit 9a5bc72
Show file tree
Hide file tree
Showing 25 changed files with 356 additions and 449 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,16 @@
public final class JobShardingUnit {

/**
* 作业服务器IP地址.
* 作业实例主键.
*/
private final String serverIp;
private final String jobInstanceId;

/**
* 作业实例主键.
* 获取作业服务器IP地址.
*
* @return 作业服务器IP地址
*/
private final String jobInstanceId;
public String getIp() {
return jobInstanceId.substring(0, jobInstanceId.indexOf("@-@"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

import com.dangdang.ddframe.job.lite.internal.listener.AbstractJobListener;
import com.dangdang.ddframe.job.lite.internal.listener.AbstractListenerManager;
import com.dangdang.ddframe.job.lite.internal.server.InstanceNode;
import com.dangdang.ddframe.job.lite.internal.server.ServerNode;
import com.dangdang.ddframe.job.lite.internal.server.ServerOperationNode;
import com.dangdang.ddframe.job.lite.internal.server.ServerService;
import com.dangdang.ddframe.job.lite.internal.server.ServerStatus;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
Expand All @@ -44,17 +44,17 @@ public class ElectionListenerManager extends AbstractListenerManager {

private final ElectionNode electionNode;

private final ServerNode serverNode;
private final InstanceNode instanceNode;

private final ServerOperationNode serverOperationNode;
private final ServerNode serverNode;

public ElectionListenerManager(final CoordinatorRegistryCenter regCenter, final String jobName) {
super(regCenter, jobName);
leaderElectionService = new LeaderElectionService(regCenter, jobName);
serverService = new ServerService(regCenter, jobName);
electionNode = new ElectionNode(jobName);
instanceNode = new InstanceNode(jobName);
serverNode = new ServerNode(jobName);
serverOperationNode = new ServerOperationNode(jobName);
}

@Override
Expand Down Expand Up @@ -94,19 +94,19 @@ private boolean isLeaderCrashed() {
}

private boolean isServerEnabled() {
return serverOperationNode.isLocalServerPath(path) && !ServerStatus.DISABLED.name().equals(new String(event.getData().getData()));
return serverNode.isLocalServerPath(path) && !ServerStatus.DISABLED.name().equals(new String(event.getData().getData()));
}

boolean isServerOff() {
return isServerDisabled() || isServerShutdown();
}

private boolean isServerDisabled() {
return serverOperationNode.isLocalServerPath(path) && ServerStatus.DISABLED.name().equals(new String(event.getData().getData()));
return serverNode.isLocalServerPath(path) && ServerStatus.DISABLED.name().equals(new String(event.getData().getData()));
}

private boolean isServerShutdown() {
return serverNode.isLocalInstancePath(path) && Type.NODE_REMOVED == event.getType();
return instanceNode.isLocalInstancePath(path) && Type.NODE_REMOVED == event.getType();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ public final class ExecutionNode {

public static final String IP = ROOT + "/%s/ip";

public static final String INSTANCE = ROOT + "/%s/instance";
static final String INSTANCE_APPENDIX = "instance";

public static final String INSTANCE = ROOT + "/%s/" + INSTANCE_APPENDIX;

static final String RUNNING_APPENDIX = "running";

Expand Down Expand Up @@ -64,6 +66,10 @@ public static String getInstanceNode(final int item) {
return String.format(INSTANCE, item);
}

public boolean isInstanceNode(final String path) {
return path.startsWith(jobNodePath.getFullPath(ROOT)) && path.endsWith(INSTANCE_APPENDIX);
}

/**
* 获取作业运行状态节点路径.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/

package com.dangdang.ddframe.job.lite.internal.server;

import com.dangdang.ddframe.job.lite.internal.schedule.JobRegistry;
import com.dangdang.ddframe.job.lite.internal.storage.JobNodePath;

/**
* Elastic Job运行实例节点名称的常量类.
*
* @author zhangliang
*/
public class InstanceNode {

/**
* 运行实例信息根节点.
*/
public static final String ROOT = "instances";

private static final String INSTANCES = ROOT + "/%s";

private final String jobName;

private final JobNodePath jobNodePath;

public InstanceNode(final String jobName) {
this.jobName = jobName;
jobNodePath = new JobNodePath(jobName);
}

/**
* 获取本地作业运行实例路径.
*
* @return 本地作业运行实例路径
*/
public String getLocalInstanceNode() {
return String.format(INSTANCES, JobRegistry.getInstance().getJobInstanceId(jobName));
}

static String getInstanceNode(final String jobInstanceId) {
return String.format(INSTANCES, jobInstanceId);
}

/**
* 判断给定路径是否为本地作业运行实例路径.
*
* @param path 待判断的路径
* @return 是否为本地作业运行实例路径
*/
public boolean isLocalInstancePath(final String path) {
return path.equals(jobNodePath.getFullPath(String.format(INSTANCES, JobRegistry.getInstance().getJobInstanceId(jobName))));
}

/**
* 判断给定路径是否为作业运行实例路径.
*
* @param path 待判断的路径
* @return 是否为作业运行实例路径
*/
public boolean isInstancePath(final String path) {
return path.startsWith(jobNodePath.getFullPath(InstanceNode.ROOT));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.dangdang.ddframe.job.lite.internal.schedule.JobScheduleController;
import com.dangdang.ddframe.job.lite.internal.sharding.ShardingService;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import com.dangdang.ddframe.job.util.env.LocalHostService;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.state.ConnectionState;
Expand All @@ -38,21 +39,24 @@ public class JobOperationListenerManager extends AbstractListenerManager {

private final String jobName;

private final ServerNode serverNode;
private final InstanceNode instanceNode;

private final ServerService serverService;

private final ShardingService shardingService;

private final ExecutionService executionService;

private final LocalHostService localHostService;

public JobOperationListenerManager(final CoordinatorRegistryCenter regCenter, final String jobName) {
super(regCenter, jobName);
this.jobName = jobName;
serverNode = new ServerNode(jobName);
instanceNode = new InstanceNode(jobName);
serverService = new ServerService(regCenter, jobName);
shardingService = new ShardingService(regCenter, jobName);
executionService = new ExecutionService(regCenter, jobName);
localHostService = new LocalHostService();
}

@Override
Expand All @@ -70,7 +74,7 @@ public void stateChanged(final CuratorFramework client, final ConnectionState ne
if (ConnectionState.LOST == newState) {
jobScheduleController.pauseJob();
} else if (ConnectionState.RECONNECTED == newState) {
serverService.persistServerOnline(serverService.isLocalhostServerEnabled());
serverService.persistServerOnline(serverService.isServerEnabled(localHostService.getIp()));
executionService.clearRunningInfo(shardingService.getLocalHostShardingItems());
jobScheduleController.resumeJob();
}
Expand Down Expand Up @@ -100,7 +104,7 @@ class JobShutdownStatusJobListener extends AbstractJobListener {

@Override
protected void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path) {
if (serverNode.isLocalInstancePath(path) && TreeCacheEvent.Type.NODE_REMOVED == event.getType() && null != JobRegistry.getInstance().getJobScheduleController(jobName)) {
if (instanceNode.isLocalInstancePath(path) && TreeCacheEvent.Type.NODE_REMOVED == event.getType() && null != JobRegistry.getInstance().getJobScheduleController(jobName)) {
JobRegistry.getInstance().getJobScheduleController(jobName).shutdown();
serverService.removeInstanceStatus();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,71 +17,59 @@

package com.dangdang.ddframe.job.lite.internal.server;

import com.dangdang.ddframe.job.lite.internal.schedule.JobRegistry;
import com.dangdang.ddframe.job.lite.internal.storage.JobNodePath;
import com.dangdang.ddframe.job.util.env.LocalHostService;

import java.util.regex.Pattern;

/**
* Elastic Job服务器节点名称的常量类.
* Elastic Job服务器节点常量类.
*
* @author zhangliang
*/
public class ServerNode {

/**
* 作业服务器信息根节点.
* 服务器信息根节点.
*/
public static final String ROOT = "servers";

/**
* 作业实例信息根节点.
*/
public static final String INSTANCES_ROOT = "instances";

static final String INSTANCES = ROOT + "/%s/" + INSTANCES_ROOT + "/%s";

private final String jobName;
private static final String SERVERS = ROOT + "/%s";

private final String ip;

private final JobNodePath jobNodePath;

public ServerNode(final String jobName) {
this.jobName = jobName;
ip = new LocalHostService().getIp();
jobNodePath = new JobNodePath(jobName);
}

/**
* 获取本地作业运行实例路径.
*
* @return 本地作业运行实例路径
*/
public String getLocalInstanceNode() {
return String.format(INSTANCES, ip, JobRegistry.getInstance().getJobInstanceId(jobName));
String getServerNode() {
return getServerNode(ip);
}

static String getInstanceNode(final String ip, final String jobInstanceId) {
return String.format(INSTANCES, ip, jobInstanceId);
String getServerNode(final String ip) {
return String.format(SERVERS, ip);
}

/**
* 判断给定路径是否为本地作业运行实例路径.
* 判断给定路径是否为作业服务器路径.
*
* @param path 待判断的路径
* @return 是否为本地作业运行实例路径
* @return 是否为作业服务器路径
*/
public boolean isLocalInstancePath(final String path) {
return path.equals(jobNodePath.getFullPath(String.format(INSTANCES, ip, JobRegistry.getInstance().getJobInstanceId(jobName))));
public boolean isLocalServerPath(final String path) {
return path.equals(jobNodePath.getFullPath(String.format(SERVERS, ip)));
}

/**
* 判断给定路径是否为作业运行实例路径.
* 判断给定路径是否为作业服务器路径.
*
* @param path 待判断的路径
* @return 是否为作业运行实例路径
* @return 是否为作业服务器路径
*/
public boolean isInstancePath(final String path) {
return path.startsWith(jobNodePath.getFullPath(ServerNode.ROOT)) && path.contains(INSTANCES_ROOT);
public boolean isServerPath(final String path) {
return Pattern.compile(jobNodePath.getFullPath(ServerNode.ROOT) + "/" + LocalHostService.IP_REGEX).matcher(path).matches();
}
}

This file was deleted.

Loading

0 comments on commit 9a5bc72

Please sign in to comment.