Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,11 @@ private void initBuiltinFunctions(Collection<FunctionDesc> functions)
public void start() {
String serverAddr = conf.getVar(ConfVars.CATALOG_ADDRESS);
InetSocketAddress initIsa = NetUtils.createSocketAddr(serverAddr);

if (conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
initIsa = NetUtils.createLocalSocketAddr(initIsa);
}

int workerNum = conf.getIntVar(ConfVars.CATALOG_RPC_SERVER_WORKER_THREAD_NUM);
try {
this.rpcServer = new BlockingRpcServer(CatalogProtocol.class, handler, initIsa, workerNum);
Expand All @@ -186,6 +191,7 @@ public void start() {
throw new CatalogException(e);
}


LOG.info("Catalog Server startup (" + bindAddressStr + ")");
super.start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,22 +64,17 @@ public static TajoClient getTajoClient(TajoConf conf, TajoClient client,
TajoCliContext context) throws IOException, ServiceException {

if (conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
if (!HAServiceUtil.isMasterAlive(conf.getVar(
TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS), conf)) {
TajoClient tajoClient = null;
String baseDatabase = client.getBaseDatabase();
conf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS,
HAServiceUtil.getMasterClientName(conf));
client.close();
tajoClient = new TajoClientImpl(conf, baseDatabase);
TajoClient tajoClient = null;
String baseDatabase = client.getBaseDatabase();
conf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS,
HAServiceUtil.getMasterClientName(conf));
client.close();
tajoClient = new TajoClientImpl(conf, baseDatabase);

if (context != null && context.getCurrentDatabase() != null) {
tajoClient.selectDatabase(context.getCurrentDatabase());
}
return tajoClient;
} else {
return client;
if (context != null && context.getCurrentDatabase() != null) {
tajoClient.selectDatabase(context.getCurrentDatabase());
}
return tajoClient;
} else {
return client;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,11 @@
import java.util.List;

public class HAServiceUtil {

private final static int MASTER_UMBILICAL_RPC_ADDRESS = 1;
private final static int MASTER_CLIENT_RPC_ADDRESS = 2;
private final static int RESOURCE_TRACKER_RPC_ADDRESS = 3;
private final static int CATALOG_ADDRESS = 4;
private final static int MASTER_INFO_ADDRESS = 5;
public final static int MASTER_UMBILICAL_RPC_ADDRESS = 1;
public final static int MASTER_CLIENT_RPC_ADDRESS = 2;
public final static int RESOURCE_TRACKER_RPC_ADDRESS = 3;
public final static int CATALOG_ADDRESS = 4;
public final static int MASTER_INFO_ADDRESS = 5;

public static InetSocketAddress getMasterUmbilicalAddress(TajoConf conf) {
return getMasterAddress(conf, MASTER_UMBILICAL_RPC_ADDRESS);
Expand Down Expand Up @@ -91,6 +90,7 @@ public static InetSocketAddress getMasterAddress(TajoConf conf, int type) {
if (files.length == 1) {
Path file = files[0].getPath();
String hostAddress = file.getName().replaceAll("_", ":");

FSDataInputStream stream = fs.open(file);
String data = stream.readUTF();
stream.close();
Expand Down
15 changes: 15 additions & 0 deletions tajo-common/src/main/java/org/apache/tajo/util/NetUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@

package org.apache.tajo.util;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.net.*;

public class NetUtils {
private static Log LOG = LogFactory.getLog(NetUtils.class);

public static String normalizeInetSocketAddress(InetSocketAddress addr) {
return addr.getAddress().getHostAddress() + ":" + addr.getPort();
}
Expand All @@ -30,6 +35,16 @@ public static InetSocketAddress createSocketAddr(String addr) {
return new InetSocketAddress(splitted[0], Integer.parseInt(splitted[1]));
}

public static InetSocketAddress createLocalSocketAddr(InetSocketAddress addr) {
try {
String hostAddress = InetAddress.getLocalHost().getHostAddress();
int port = addr.getPort();
return NetUtils.createSocketAddr(hostAddress + ":" + port);
} catch (UnknownHostException e) {
}
return addr;
}

/**
* Util method to build socket addr from either:
* <host>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ public void start() {
// start the rpc server
String confClientServiceAddr = conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS);
InetSocketAddress initIsa = NetUtils.createSocketAddr(confClientServiceAddr);

if (conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
initIsa = NetUtils.createLocalSocketAddr(initIsa);
}

int workerNum = conf.getIntVar(ConfVars.MASTER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM);
try {
server = new BlockingRpcServer(TajoMasterClientProtocol.class, clientHandler, initIsa, workerNum);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ public TajoMasterService(TajoMaster.MasterContext context) {
public void start() {
String confMasterServiceAddr = conf.getVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
InetSocketAddress initIsa = NetUtils.createSocketAddr(confMasterServiceAddr);

if (conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
initIsa = NetUtils.createLocalSocketAddr(initIsa);
}

int workerNum = conf.getIntVar(TajoConf.ConfVars.MASTER_RPC_SERVER_WORKER_THREAD_NUM);
try {
server = new AsyncRpcServer(TajoMasterProtocol.class, masterHandler, initIsa, workerNum);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.tajo.util.TUtil;

import java.io.IOException;
import java.net.InetAddress;
import java.util.List;

/**
Expand Down Expand Up @@ -65,7 +66,11 @@ public HAServiceHDFSImpl(MasterContext context) throws IOException {
this.context = context;
this.conf = context.getConf();
initSystemDirectory();
this.masterName = conf.get(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS.varname);

String hostAddress = InetAddress.getLocalHost().getHostAddress();
int port = context.getTajoMasterService().getBindAddress().getPort();

this.masterName = hostAddress + ":" + port;
monitorInterval = conf.getIntVar(TajoConf.ConfVars.TAJO_MASTER_HA_MONITOR_INTERVAL);
}

Expand Down Expand Up @@ -133,7 +138,8 @@ public void register() throws IOException {
}

private void createMasterFile(boolean isActive) throws IOException {
String hostName = masterName.split(":")[0];
String hostAddress = InetAddress.getLocalHost().getHostAddress();

String fileName = masterName.replaceAll(":", "_");
Path path = null;

Expand All @@ -144,19 +150,17 @@ private void createMasterFile(boolean isActive) throws IOException {
}

StringBuilder sb = new StringBuilder();
sb.append(context.getConf().get(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS.varname,
hostName + ":26002"));
sb.append(getHostName(hostAddress, HAServiceUtil.MASTER_CLIENT_RPC_ADDRESS));
sb.append("_");
sb.append(context.getConf().get(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS.varname,
hostName + ":26003"));
sb.append(getHostName(hostAddress, HAServiceUtil.RESOURCE_TRACKER_RPC_ADDRESS));
sb.append("_");
sb.append(context.getConf().get(TajoConf.ConfVars.CATALOG_ADDRESS.varname, hostName + ":26005"));
sb.append(getHostName(hostAddress, HAServiceUtil.CATALOG_ADDRESS));
sb.append("_");
sb.append(context.getConf().get(TajoConf.ConfVars.TAJO_MASTER_INFO_ADDRESS.varname,
hostName + ":26080"));
sb.append(getHostName(hostAddress, HAServiceUtil.MASTER_INFO_ADDRESS));

FSDataOutputStream out = fs.create(path);
out.writeUTF(sb.toString());
out.hflush();
out.close();

if (isActive) {
Expand All @@ -169,6 +173,48 @@ private void createMasterFile(boolean isActive) throws IOException {
}


private String getHostName(String hostAddress, int type) {
String hostName = null;
int port = 0;

switch (type) {
case 1:
hostName = context.getConf().get(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS
.varname);
port = 26001;
break;
case 2:
hostName = context.getConf().get(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS.varname);
port = 26002;
break;
case 3:
hostName = context.getConf().get(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS.varname);
port = 26003;
break;
case 4:
hostName = context.getConf().get(TajoConf.ConfVars.CATALOG_ADDRESS.varname);
port = 26005;
break;
case 5:
hostName = context.getConf().get(TajoConf.ConfVars.TAJO_MASTER_INFO_ADDRESS.varname);
port = 26080;
break;
default:
break;
}

StringBuilder sb = new StringBuilder();
sb.append(hostAddress);
sb.append(":");

if (hostName == null) {
sb.append(port);
} else {
sb.append(hostName.split(":")[1]);
}
return sb.toString();
}

@Override
public void delete() throws IOException {
String fileName = masterName.replaceAll(":", "_");
Expand Down Expand Up @@ -196,9 +242,6 @@ public boolean isActiveStatus() {
@Override
public List<TajoMasterInfo> getMasters() throws IOException {
List<TajoMasterInfo> list = TUtil.newList();
boolean isAlive = false;
TajoMasterInfo info = null;
String hostAddress = null;
Path path = null;

FileStatus[] files = fs.listStatus(activePath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ public void init(Configuration conf) {
throw new IllegalArgumentException("Failed resolve of " + initIsa);
}

if (tajoConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
initIsa = NetUtils.createLocalSocketAddr(initIsa);
}

int workerNum = tajoConf.getIntVar(TajoConf.ConfVars.QUERY_MASTER_RPC_SERVER_WORKER_THREAD_NUM);
this.rpcServer = new AsyncRpcServer(QueryMasterProtocol.class, this, initIsa, workerNum);
this.rpcServer.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ public void serviceInit(Configuration conf) {
String confMasterServiceAddr = systemConf.getVar(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS);
InetSocketAddress initIsa = NetUtils.createSocketAddr(confMasterServiceAddr);

if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
initIsa = NetUtils.createLocalSocketAddr(initIsa);
}

try {
server = new AsyncRpcServer(TajoResourceTrackerProtocol.class, this, initIsa, 3);
} catch (Exception e) {
Expand Down