Skip to content
This repository has been archived by the owner on Jun 7, 2021. It is now read-only.

Commit

Permalink
Merge branch 'APEXCORE-608' of https://github.com/vrozov/apex-core
Browse files Browse the repository at this point in the history
  • Loading branch information
tweise committed Feb 19, 2017
2 parents d80501b + c687bb5 commit 32f229f
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 127 deletions.
147 changes: 80 additions & 67 deletions engine/src/main/java/com/datatorrent/stram/RecoverableRpcProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.ConnectException;
import java.lang.reflect.Proxy;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.nio.charset.Charset;
import java.util.List;

import javax.net.SocketFactory;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -40,8 +42,6 @@
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.client.utils.URLEncodedUtils;

import com.google.common.base.Throwables;

import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;

import static java.lang.Thread.sleep;
Expand All @@ -68,98 +68,111 @@ public class RecoverableRpcProxy implements java.lang.reflect.InvocationHandler,
private static final int RPC_TIMEOUT_DEFAULT = 5000;

private final Configuration conf;
private final String appPath;
private StreamingContainerUmbilicalProtocol umbilical;
private String lastConnectURI;
private long lastCompletedCallTms;
private long retryTimeoutMillis = Long.getLong(RETRY_TIMEOUT, RETRY_TIMEOUT_DEFAULT);
private long retryDelayMillis = Long.getLong(RETRY_DELAY, RETRY_DELAY_DEFAULT);
private int rpcTimeout = Integer.getInteger(RPC_TIMEOUT, RPC_TIMEOUT_DEFAULT);

public RecoverableRpcProxy(String appPath, Configuration conf) throws IOException
private long retryTimeoutMillis;
private long retryDelayMillis;
private int rpcTimeout;
private final UserGroupInformation currentUser;
private final SocketFactory defaultSocketFactory;
private final FSRecoveryHandler fsRecoveryHandler;

public RecoverableRpcProxy(String appPath, Configuration conf)
{
this.conf = conf;
this.appPath = appPath;
connect();
try {
currentUser = UserGroupInformation.getCurrentUser();
defaultSocketFactory = NetUtils.getDefaultSocketFactory(conf);
fsRecoveryHandler = new FSRecoveryHandler(appPath, conf);
connect(0);
} catch (IOException e) {
LOG.error("Fail to create RecoverableRpcProxy", e);
throw new RuntimeException(e);
}
}

private void connect() throws IOException
private long connect(long timeMillis) throws IOException
{
FSRecoveryHandler fsrh = new FSRecoveryHandler(appPath, conf);
String uriStr = fsrh.readConnectUri();
String uriStr = fsRecoveryHandler.readConnectUri();
if (!uriStr.equals(lastConnectURI)) {
// reset timeout
LOG.debug("Got new RPC connect address {}", uriStr);
lastCompletedCallTms = System.currentTimeMillis();
lastConnectURI = uriStr;
}
URI heartbeatUri = URI.create(uriStr);
if (umbilical != null) {
RPC.stopProxy(umbilical);
}

String queryStr = heartbeatUri.getQuery();
List<NameValuePair> queryList = null;
if (queryStr != null) {
queryList = URLEncodedUtils.parse(queryStr, Charset.defaultCharset());
}
if (queryList != null) {
for (NameValuePair pair : queryList) {
String value = pair.getValue();
String key = pair.getName();
if (QP_rpcTimeout.equals(key)) {
this.rpcTimeout = Integer.parseInt(value);
} else if (QP_retryTimeoutMillis.equals(key)) {
this.retryTimeoutMillis = Long.parseLong(value);
} else if (QP_retryDelayMillis.equals(key)) {
this.retryDelayMillis = Long.parseLong(value);
retryTimeoutMillis = Long.getLong(RETRY_TIMEOUT, RETRY_TIMEOUT_DEFAULT);
retryDelayMillis = Long.getLong(RETRY_DELAY, RETRY_DELAY_DEFAULT);
rpcTimeout = Integer.getInteger(RPC_TIMEOUT, RPC_TIMEOUT_DEFAULT);

URI heartbeatUri = URI.create(uriStr);

String queryStr = heartbeatUri.getQuery();
if (queryStr != null) {
List<NameValuePair> queryList = URLEncodedUtils.parse(queryStr, Charset.defaultCharset());
if (queryList != null) {
for (NameValuePair pair : queryList) {
String value = pair.getValue();
String key = pair.getName();
if (QP_rpcTimeout.equals(key)) {
this.rpcTimeout = Integer.parseInt(value);
} else if (QP_retryTimeoutMillis.equals(key)) {
this.retryTimeoutMillis = Long.parseLong(value);
} else if (QP_retryDelayMillis.equals(key)) {
this.retryDelayMillis = Long.parseLong(value);
}
}
}
}
InetSocketAddress address = NetUtils.createSocketAddrForHost(heartbeatUri.getHost(), heartbeatUri.getPort());
umbilical = RPC.getProxy(StreamingContainerUmbilicalProtocol.class, StreamingContainerUmbilicalProtocol.versionID, address, currentUser, conf, defaultSocketFactory, rpcTimeout);
// reset timeout
return System.currentTimeMillis() + retryTimeoutMillis;
}
InetSocketAddress address = NetUtils.createSocketAddrForHost(heartbeatUri.getHost(), heartbeatUri.getPort());
umbilical = RPC.getProxy(StreamingContainerUmbilicalProtocol.class, StreamingContainerUmbilicalProtocol.versionID, address, UserGroupInformation.getCurrentUser(), conf, NetUtils.getDefaultSocketFactory(conf), rpcTimeout);
return timeMillis;
}

public StreamingContainerUmbilicalProtocol getProxy()
public StreamingContainerUmbilicalProtocol getProxy() throws IOException
{
StreamingContainerUmbilicalProtocol recoverableProxy = (StreamingContainerUmbilicalProtocol)java.lang.reflect.Proxy.newProxyInstance(umbilical.getClass().getClassLoader(), umbilical.getClass().getInterfaces(), this);
if (umbilical == null) {
throw new IOException("RecoverableRpcProxy is closed.");
}
StreamingContainerUmbilicalProtocol recoverableProxy = (StreamingContainerUmbilicalProtocol)Proxy.newProxyInstance(umbilical.getClass().getClassLoader(), umbilical.getClass().getInterfaces(), this);
return recoverableProxy;
}

@Override
@SuppressWarnings("SleepWhileInLoop")
public Object invoke(Object proxy, Method method, Object[] args) throws ConnectException, SocketTimeoutException, InterruptedException, IllegalAccessException
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
{
Object result;
long endTimeMillis = System.currentTimeMillis() + retryTimeoutMillis;
if (umbilical == null) {
endTimeMillis = connect(endTimeMillis);
}
while (true) {
if (umbilical == null) {
throw new IOException("RecoverableRpcProxy is closed.");
}
try {
if (umbilical == null) {
connect();
}
//long start = System.nanoTime();
result = method.invoke(umbilical, args);
lastCompletedCallTms = System.currentTimeMillis();
//long end = System.nanoTime();
//LOG.info(String.format("%s took %d ns", method.getName(), (end - start)));
return result;
} catch (InvocationTargetException e) {
return method.invoke(umbilical, args);
} catch (Throwable t) {
// handle RPC failure
Throwable targetException = e.getTargetException();
long connectMillis = System.currentTimeMillis() - lastCompletedCallTms;
if (connectMillis < retryTimeoutMillis) {
LOG.warn("RPC failure, attempting reconnect after {} ms (remaining {} ms)", retryDelayMillis, retryTimeoutMillis - connectMillis, targetException);
close();
while (t instanceof InvocationTargetException || t instanceof UndeclaredThrowableException) {
Throwable cause = t.getCause();
if (cause != null) {
t = cause;
}
}
final long currentTimeMillis = System.currentTimeMillis();
if (currentTimeMillis < endTimeMillis) {
LOG.warn("RPC failure, will retry after {} ms (remaining {} ms)", retryDelayMillis, endTimeMillis - currentTimeMillis, t);
sleep(retryDelayMillis);
endTimeMillis = connect(endTimeMillis);
} else {
LOG.error("Giving up RPC connection recovery after {} ms", connectMillis, targetException);
if (targetException instanceof java.net.ConnectException) {
throw (java.net.ConnectException)targetException;
} else if (targetException instanceof java.net.SocketTimeoutException) {
throw (java.net.SocketTimeoutException)targetException;
} else {
throw Throwables.propagate(targetException);
}
LOG.error("Giving up RPC connection recovery after {} ms", currentTimeMillis - endTimeMillis + retryTimeoutMillis, t);
close();
throw t;
}
} catch (IOException ex) {
close();
throw new RuntimeException(ex);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,9 @@ public ProtocolSignature getProtocolSignature(String protocol, long clientVersio
}

@Override
public void reportError(String containerId, int[] operators, String msg)
public void reportError(String containerId, int[] operators, String msg) throws IOException
{
try {
log(containerId, msg);
} catch (IOException ex) {
// ignore
}
log(containerId, msg);
}

@Override
Expand All @@ -131,7 +127,7 @@ public StreamingContainerContext getInitContext(String containerId)
}

@Override
public ContainerHeartbeatResponse processHeartbeat(ContainerHeartbeat msg)
public ContainerHeartbeatResponse processHeartbeat(ContainerHeartbeat msg) throws IOException
{
if (injectShutdown.containsKey(msg.getContainerId())) {
ContainerHeartbeatResponse r = new ContainerHeartbeatResponse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,16 @@ public long getProtocolVersion(String arg0, long arg1) throws IOException
@Override
public void log(String containerId, String msg) throws IOException
{
LOG.info("child msg: {} context: {}", msg, dagManager.getContainerAgent(containerId).container);
final StreamingContainerAgent sca = dagManager.getContainerAgent(containerId);
if (sca != null) {
LOG.info("child msg: {} context: {}", msg, sca.container);
} else {
LOG.info("unknown container {} msg: {}", containerId, msg);
}
}

@Override
public void reportError(String containerId, int[] operators, String msg)
public void reportError(String containerId, int[] operators, String msg) throws IOException
{
if (operators == null || operators.length == 0) {
dagManager.recordEventAsync(new ContainerErrorEvent(containerId, msg));
Expand All @@ -179,24 +184,23 @@ public void reportError(String containerId, int[] operators, String msg)
}
}
}
try {
log(containerId, msg);
} catch (IOException ex) {
// ignore
}
log(containerId, msg);
}

@Override
public StreamingContainerContext getInitContext(String containerId)
throws IOException
{
StreamingContainerContext scc = null;
StreamingContainerAgent sca = dagManager.getContainerAgent(containerId);

return sca.getInitContext();
if (sca != null) {
scc = sca.getInitContext();
}
return scc;
}

@Override
public ContainerHeartbeatResponse processHeartbeat(ContainerHeartbeat msg)
public ContainerHeartbeatResponse processHeartbeat(final ContainerHeartbeat msg) throws IOException
{
// -- TODO
// Change to use some sort of a annotation that developers can use to specify secure code
Expand All @@ -208,20 +212,14 @@ public ContainerHeartbeatResponse processHeartbeat(ContainerHeartbeat msg)
//LOG.debug("RPC latency from child container {} is {} ms (according to system clocks)", msg.getContainerId(),
// now - msg.sentTms);
dagManager.updateRPCLatency(msg.getContainerId(), now - msg.sentTms);
try {
final ContainerHeartbeat fmsg = msg;
return SecureExecutor.execute(new SecureExecutor.WorkLoad<ContainerHeartbeatResponse>()
return SecureExecutor.execute(new SecureExecutor.WorkLoad<ContainerHeartbeatResponse>()
{
@Override
public ContainerHeartbeatResponse run()
{
@Override
public ContainerHeartbeatResponse run()
{
return dagManager.processHeartbeat(fmsg);
}
});
} catch (IOException ex) {
LOG.error("Error processing heartbeat", ex);
return null;
}
return dagManager.processHeartbeat(msg);
}
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -431,14 +431,14 @@ class ContainerHeartbeatResponse extends AbstractWritableAdapter
* @param operators
* @param msg
*/
void reportError(String containerId, int[] operators, String msg);
void reportError(String containerId, int[] operators, String msg) throws IOException;

/**
* To be called periodically by child for heartbeat protocol.
*
* @param msg
* @return
*/
ContainerHeartbeatResponse processHeartbeat(ContainerHeartbeat msg);
ContainerHeartbeatResponse processHeartbeat(ContainerHeartbeat msg) throws IOException;

}
Loading

0 comments on commit 32f229f

Please sign in to comment.