Skip to content
This repository has been archived by the owner on Jun 7, 2021. It is now read-only.

Commit

Permalink
TRAFODION-2940 In HA env, one node lose network, when recover, trafci…
Browse files Browse the repository at this point in the history
… can not use
  • Loading branch information
aven committed Mar 13, 2018
1 parent 360427c commit c43bb2e
Show file tree
Hide file tree
Showing 10 changed files with 300 additions and 58 deletions.
42 changes: 41 additions & 1 deletion dcs/bin/scripts/dcsunbind.sh
Expand Up @@ -51,6 +51,31 @@ function check_node {
done
}

function check_self_node {
for myinterface in `/sbin/ip link show|cut -d: -f1- | cut -c1- | awk -F': ' '/^[0-9]+:.*/ {print $2;}'`; do
ip_output=$(/sbin/ip addr show $myinterface | cut -d: -f1- | cut -c1-)

myifport=`echo "$ip_output" | grep -w $gv_float_external_ip`
status=$?
if [ $status -eq 0 ]; then
tempinterface=`echo $gv_float_interface:$gv_port`
# check if another interface is bound to this virtual ip address
echo "$myifport" | grep "$tempinterface" > /dev/null
if [ $? -eq 0 ]; then
unbindip=`echo "$myifport" | awk '{print $2}'`
unbindlb=`echo "$myifport"|awk '{print $NF}'`
echo "Virtual ip $gv_float_external_ip is in use on node $HOSTNAME bound to interface $myinterface($unbindlb) - unbinding..."
sudo /sbin/ip addr del $unbindip dev $myinterface
status=$?
if [ $status -ne 0 ]; then
echo "Failed to unbind - status is $status"
exit -1
fi
fi # endif node+name match
fi # endif looking for external ip
done
}

function Check_VirtualIP_InUse_And_Unbind {
echo "check all nodes to see if external virtual ip address is in use and unbind if necessary"
mynode=""
Expand All @@ -64,12 +89,23 @@ function Check_VirtualIP_InUse_And_Unbind {
fi
}

function Check_VirtualIP_InUse_And_Unbind_Self {
check_self_node
}

#Main program

if [[ $ENABLE_HA == "false" ]]; then
exit 0
fi

unbindSelf=false
for i in "$@"; do
if [[ $i=="self" ]]; then
unbindSelf=true
fi
done

gv_float_internal_ip=`python $DCS_INSTALL_DIR/bin/scripts/parse_dcs_site.py|cut -d$'\n' -f2`
gv_float_external_ip=`python $DCS_INSTALL_DIR/bin/scripts/parse_dcs_site.py|cut -d$'\n' -f2`
gv_float_interface=`python $DCS_INSTALL_DIR/bin/scripts/parse_dcs_site.py|cut -d$'\n' -f1`
Expand All @@ -94,6 +130,10 @@ if [[ $AWS_CLOUD == "true" ]]; then
echo "Detached interface :" $NETWORKINTERFACE
fi
else
Check_VirtualIP_InUse_And_Unbind
if [ $unbindSelf ]; then
Check_VirtualIP_InUse_And_Unbind_Self
else
Check_VirtualIP_InUse_And_Unbind
fi
fi
exit 0
6 changes: 6 additions & 0 deletions dcs/src/main/java/org/trafodion/dcs/Constants.java
Expand Up @@ -559,6 +559,12 @@ public final class Constants {
/** Default value for DcsMaster floating IP command */
public static final String DEFAULT_DCS_MASTER_FLOATING_IP_COMMAND = "cd ${dcs.home.dir};bin/scripts/dcsbind.sh -i -a -p";

/** DcsMaster floating IP command unbind*/
public static final String DCS_MASTER_FLOATING_IP_COMMAND_UNBIND = "dcs.master.floating.ip.command.unbind";

/** Default value for DcsMaster floating IP command unbind*/
public static final String DEFAULT_DCS_MASTER_FLOATING_IP_COMMAND_UNBIND = "cd ${dcs.home.dir};bin/scripts/dcsunbind.sh self";

/** DcsMaster Floating IP external interface */
public static final String DCS_MASTER_FLOATING_IP_EXTERNAL_INTERFACE = "dcs.master.floating.ip.external.interface";

Expand Down
151 changes: 114 additions & 37 deletions dcs/src/main/java/org/trafodion/dcs/master/DcsMaster.java
Expand Up @@ -23,21 +23,16 @@ Licensed to the Apache Software Foundation (ASF) under one
package org.trafodion.dcs.master;

import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.NetworkInterface;
import java.nio.charset.Charset;
import java.util.Enumeration;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import org.apache.commons.io.IOUtils;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.Options;
Expand All @@ -46,23 +41,22 @@ Licensed to the Apache Software Foundation (ASF) under one
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper.States;

import org.apache.hadoop.util.StringUtils;

import org.trafodion.dcs.Constants;
import org.trafodion.dcs.master.listener.ListenerService;
import org.trafodion.dcs.master.listener.ListenerWorker;
import org.trafodion.dcs.util.DcsConfiguration;
import org.trafodion.dcs.util.DcsNetworkConfiguration;
import org.trafodion.dcs.util.InfoServer;
import org.trafodion.dcs.util.RetryCounter;
import org.trafodion.dcs.util.RetryCounterFactory;
import org.trafodion.dcs.util.VersionInfo;
import org.trafodion.dcs.zookeeper.ZkClient;
import org.trafodion.dcs.zookeeper.ZKConfig;
import org.trafodion.dcs.master.listener.ListenerService;
import org.trafodion.dcs.zookeeper.ZkClient;

public class DcsMaster implements Runnable {
public class DcsMaster implements Callable<Integer> {
private static final Log LOG = LogFactory.getLog(DcsMaster.class);
private Thread thrd;
private ZkClient zkc = null;
Expand Down Expand Up @@ -111,11 +105,50 @@ public DcsMaster(String[] args) {
trafodionHome = System.getProperty(Constants.DCS_TRAFODION_HOME);
jvmShutdownHook = new JVMShutdownHook();
Runtime.getRuntime().addShutdownHook(jvmShutdownHook);
thrd = new Thread(this);
thrd.start();

// 35000 * 15mins ~= 1 years
RetryCounter retryCounter = RetryCounterFactory.create(35000, 15, TimeUnit.MINUTES);
ExecutorService executorService = Executors.newFixedThreadPool(1);
CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(executorService);

while (true) {
completionService.submit(this);
Future<Integer> f = null;
try {
f = completionService.take();
if (f != null) {
Integer status = f.get();
if (status <= 0) {
System.exit(status);
} else if (status == 1) {
if (retryCounter.shouldRetry()) {
retryCounter.sleepUntilNextRetry();
retryCounter.useRetry();
} else {
System.exit(-2);
}
// reset lock
isLeader = new CountDownLatch(1);
break;
} else {
//TODO for other unknown status
}
}
} catch (InterruptedException | ExecutionException e) {
LOG.error(e.getMessage(), e);
}
}

}

public void run() {
// return value lesser than 0, means can't recover exception exit.
// -1 configure error
// -2 retry exhaust
// return value greater than 0 , means exception can be recover.
// 1 means network error, retry till network recover.
// return value equals 0, means unknow exception, do exit now.
// change value other than 0 when confirm the exception real reason.
public Integer call() {
VersionInfo.logVersion();

Options opt = new Options();
Expand All @@ -129,19 +162,19 @@ public void run() {
instance = "1";
} catch (NullPointerException e) {
LOG.error("No args found: ", e);
System.exit(1);
return -1;
} catch (ParseException e) {
LOG.error("Could not parse: ", e);
System.exit(1);
return -1;
}

try {
zkc = new ZkClient();
zkc.connect();
LOG.info("Connected to ZooKeeper");
} catch (Exception e) {
LOG.error(e);
System.exit(1);
} catch (IOException | InterruptedException e) {
LOG.error(e.getMessage(), e);
return 1;
}

try {
Expand Down Expand Up @@ -202,9 +235,10 @@ public void run() {
}
} catch (KeeperException.NodeExistsException e) {
// do nothing...some other server has created znodes
LOG.warn(e.getMessage(), e);
} catch (Exception e) {
LOG.error(e);
System.exit(0);
LOG.error(e.getMessage(), e);
return 0;
}

metrics = new Metrics();
Expand All @@ -213,10 +247,10 @@ public void run() {
try {
netConf = new DcsNetworkConfiguration(conf);
serverName = netConf.getHostName();
if (serverName == null) {
if (serverName == null) {
LOG.error("DNS Interface [" + conf.get(Constants.DCS_DNS_INTERFACE, Constants.DEFAULT_DCS_DNS_INTERFACE)
+ "] configured in dcs.site.xml is not found!");
System.exit(1);
+ "] configured in dcs.site.xml is not found!");
return -1;
}

// Wait to become the leader of all DcsMasters
Expand All @@ -229,6 +263,11 @@ public void run() {
+ ":" + startTime;
zkc.create(path, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
// Add a check path here for session expired situation,
// if there meets session expired, use the mark to compare with the exist znode,
// if not match, that means a backup master take over the master role.
zkc.setCheckPath(path);

LOG.info("Created znode [" + path + "]");

int requestTimeout = conf.getInt(
Expand Down Expand Up @@ -262,12 +301,50 @@ public void run() {
future.get();// block

} catch (Exception e) {
LOG.error(e);
e.printStackTrace();
if (pool != null)
pool.shutdown();
System.exit(0);
LOG.error(e.getMessage(), e);
try {
FloatingIp floatingIp = FloatingIp.getInstance(this);
floatingIp.unbindScript();
} catch (Exception e1) {
if (LOG.isErrorEnabled()) {
LOG.error("Error creating class FloatingIp [" + e1.getMessage() + "]", e1);
}
}

if (pool != null) {
try {
pool.shutdown();
LOG.info("Interrupt listenerService.");
} catch (Exception e2) {
LOG.error("Error while shutdown ServerManager thread [" + e2.getMessage() + "]", e2);
}
}

if (ls != null) {
try {
ListenerWorker lw = ls.getWorker();
if (lw != null) {
lw.interrupt();
LOG.info("Interrupt listenerWorker.");
}
ls.interrupt();
LOG.info("Interrupt listenerService.");
} catch (Exception e2) {
LOG.error("Error while shutdown ListenerService thread [" + e2.getMessage() + "]", e2);
}
}
if (infoServer != null) {
try {
infoServer.stop();
LOG.info("Stop infoServer.");
} catch (Exception e2) {
LOG.error("Error while shutdown InfoServer thread [" + e2.getMessage(), e2);
}
}
return 1;

}
return 0;
}

public String getServerName() {
Expand Down
37 changes: 37 additions & 0 deletions dcs/src/main/java/org/trafodion/dcs/master/FloatingIp.java
Expand Up @@ -61,6 +61,43 @@ public boolean isEnabled() {
return isEnabled;
}

public synchronized int unbindScript() throws Exception {
if (isEnabled)
LOG.info("Floating IP is enabled");
else {
LOG.info("Floating IP is disabled");
return 0;
}

ScriptContext scriptContext = new ScriptContext();
scriptContext.setScriptName(Constants.SYS_SHELL_SCRIPT_NAME);
scriptContext.setStripStdOut(false);
scriptContext.setStripStdErr(false);

String command = master.getConfiguration().get(Constants.DCS_MASTER_FLOATING_IP_COMMAND_UNBIND,
Constants.DEFAULT_DCS_MASTER_FLOATING_IP_COMMAND_UNBIND);

scriptContext.setCommand(command);
LOG.info("Unbind Floating IP [" + scriptContext.getCommand() + "]");
ScriptManager.getInstance().runScript(scriptContext);// Blocking call

StringBuilder sb = new StringBuilder();
sb.append("exit code [" + scriptContext.getExitCode() + "]");
if (!scriptContext.getStdOut().toString().isEmpty())
sb.append(", stdout [" + scriptContext.getStdOut().toString() + "]");
if (!scriptContext.getStdErr().toString().isEmpty())
sb.append(", stderr [" + scriptContext.getStdErr().toString() + "]");
if (LOG.isErrorEnabled())
LOG.error(sb.toString());

if (scriptContext.getExitCode() == 0)
LOG.info("Unbind Floating IP successful, exit code [" + 0 + "]");
else
LOG.error("Unbind Floating IP failed, exit code [" + scriptContext.getExitCode() + "]");

return scriptContext.getExitCode();
}

public synchronized int runScript() throws Exception {
if (isEnabled)
LOG.info("Floating IP is enabled");
Expand Down
7 changes: 5 additions & 2 deletions dcs/src/main/java/org/trafodion/dcs/master/ServerManager.java
Expand Up @@ -328,16 +328,19 @@ public Boolean call() throws Exception {
}
}

if (!zkc.isSessionRecoverSuccessful()) {
throw new Exception("error while recover zkclient session. lose zookeeper connection. restart DCS Master.");
}

try {
Thread.sleep(timeoutMillis);
} catch (InterruptedException e) {
}
}

} catch (Exception e) {
e.printStackTrace();
if (LOG.isErrorEnabled())
LOG.error(e);
LOG.error(e.getMessage(), e);
pool.shutdown();
throw e;
}
Expand Down

0 comments on commit c43bb2e

Please sign in to comment.