Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[STORM-3054] Add Topology level configuration socket timeout for DRPC Invocation Client #2651

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions conf/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ storm.cluster.mode: "distributed" # can be distributed or local
storm.local.mode.zmq: false
storm.thrift.transport: "org.apache.storm.security.auth.SimpleTransportPlugin"
storm.thrift.socket.timeout.ms: 600000
topology.drpc.invocations.thrift.socket.timeout.ms: 60000
storm.principal.tolocal: "org.apache.storm.security.auth.DefaultPrincipalToLocal"
storm.group.mapping.service: "org.apache.storm.security.auth.ShellBasedGroupsMapping"
storm.group.mapping.service.params: null
Expand Down
6 changes: 6 additions & 0 deletions storm-client/src/jvm/org/apache/storm/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -1001,6 +1001,12 @@ public class Config extends HashMap<String, Object> {
@isInteger
@isPositiveNumber
public static final String DRPC_INVOCATIONS_THREADS = "drpc.invocations.threads";
/**
* ForTopology level DRPC Invocation Client, how long before a Thrift Client socket hangs before timeout and restart the socket with
* default of 60000 milliseconds.
*/
@isInteger
public static final String TOPOLOGY_DRPC_INVOCATIONS_THRIFT_SOCKET_TIMEOUT_MS = "topology.drpc.invocations.thrift.socket.timeout.ms";
/**
* Initialization parameters for the group mapping service plugin. Provides a way for a
* @link{STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.storm.generated.DistributedRPCInvocations;
import org.apache.storm.security.auth.ThriftClient;
import org.apache.storm.security.auth.ThriftConnectionType;
import org.apache.storm.utils.ObjectReader;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
Expand All @@ -32,7 +33,8 @@ public class DRPCInvocationsClient extends ThriftClient implements DistributedRP
private int port;

public DRPCInvocationsClient(Map<String, Object> conf, String host, int port) throws TTransportException {
super(conf, ThriftConnectionType.DRPC_INVOCATIONS, host, port, null);
super(conf, ThriftConnectionType.DRPC_INVOCATIONS, host, port, ObjectReader
.getInt(conf.getOrDefault(org.apache.storm.Config.TOPOLOGY_DRPC_INVOCATIONS_THRIFT_SOCKET_TIMEOUT_MS, 60000), 60000));
this.host = host;
this.port = port;
client.set(new DistributedRPCInvocations.Client(_protocol));
Expand Down
64 changes: 33 additions & 31 deletions storm-client/src/jvm/org/apache/storm/drpc/ReturnResults.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class ReturnResults extends BaseRichBolt {
OutputCollector _collector;
boolean local;
Map<String, Object> _conf;
Map<List, DRPCInvocationsClient> _clients = new HashMap<List, DRPCInvocationsClient>();
Map<List<String>, DRPCInvocationsClient> _clients = new HashMap<>();

@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
Expand All @@ -54,6 +54,7 @@ public void prepare(Map<String, Object> topoConf, TopologyContext context, Outpu
public void execute(Tuple input) {
String result = (String) input.getValue(0);
String returnInfo = (String) input.getValue(1);
LOG.debug("Request Info: {}, Result: {}", returnInfo, result);
if (returnInfo != null) {
Map<String, Object> retMap;
try {
Expand All @@ -66,34 +67,22 @@ public void execute(Tuple input) {
final String host = (String) retMap.get("host");
final int port = ObjectReader.getInt(retMap.get("port"));
String id = (String) retMap.get("id");
DistributedRPCInvocations.Iface client;
if (local) {
client = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(host);
} else {
List server = new ArrayList() {{
add(host);
add(port);
}};

if (!_clients.containsKey(server)) {
try {
_clients.put(server, new DRPCInvocationsClient(_conf, host, port));
} catch (TTransportException ex) {
throw new RuntimeException(ex);
}
}
client = _clients.get(server);
}

LOG.debug("Request Id: {}, Result: {}", id, result);
DistributedRPCInvocations.Iface client = getDRPCClient(host, port);

int retryCnt = 0;
int maxRetries = 3;
while (retryCnt < maxRetries) {
retryCnt++;
try {
client.result(id, result);
_collector.ack(input);
break;
if (client != null) {
LOG.debug("Trying to publish Request Id: {}, Result: {}, to DRPC {}", id, result, host);
client.result(id, result);
_collector.ack(input);
break;
} else {
client = getDRPCClient(host, port);
}
} catch (AuthorizationException aze) {
LOG.error("Not authorized to return results to DRPC server", aze);
_collector.fail(input);
Expand All @@ -103,21 +92,34 @@ public void execute(Tuple input) {
LOG.error("Failed to return results to DRPC server", tex);
_collector.fail(input);
}
reconnectClient((DRPCInvocationsClient) client);
client = getDRPCClient(host, port);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now it always reuse the existing client even TException is being raised from the client. I guess we need to handle the case.

Copy link
Contributor Author

@kishorvpatil kishorvpatil May 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is same as before, if we could not make connection. or reconnect failed in previous case. The whole reason to create new client from scratch in getDCPClient is to avoid using reconnect with security enabled. The stale connection fails to respond to security challenge.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kishorvpatil
I meant we don't invalidate broken client from _clients so it will always pick same client, instead of rebuilding new client.

}
}
}
}

private void reconnectClient(DRPCInvocationsClient client) {
if (client instanceof DRPCInvocationsClient) {
try {
LOG.info("reconnecting... ");
client.reconnectClient(); //Blocking call
} catch (TException e2) {
LOG.error("Failed to connect to DRPC server", e2);
private DistributedRPCInvocations.Iface getDRPCClient(String host, int port) {
DistributedRPCInvocations.Iface client;
if (local) {
client = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(host);
} else {
List server = new ArrayList() {
{
add(host);
add(port);
}
};
if (!_clients.containsKey(server)) {
try {
DRPCInvocationsClient oldClient = _clients.put(server, new DRPCInvocationsClient(_conf, host, port));
Copy link
Contributor

@HeartSaVioR HeartSaVioR May 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now it loses the cache functionality. Instead of this approach, can we invalidate cache when we find that the client is broken?

oldClient.close();
} catch (TTransportException ex) {
throw new RuntimeException(ex);
}
}
client = _clients.get(server);
}
return client;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ public enum ThriftConnectionType {
Config.SUPERVISOR_THRIFT_SOCKET_TIMEOUT_MS, WorkerTokenServiceType.SUPERVISOR, false),
//A DRPC token only works for the invocations transport, not for the basic thrift transport.
DRPC(Config.DRPC_THRIFT_TRANSPORT_PLUGIN, Config.DRPC_PORT, Config.DRPC_QUEUE_SIZE,
Config.DRPC_WORKER_THREADS, Config.DRPC_MAX_BUFFER_SIZE, null, null, false),
DRPC_INVOCATIONS(Config.DRPC_INVOCATIONS_THRIFT_TRANSPORT_PLUGIN, Config.DRPC_INVOCATIONS_PORT, null,
Config.DRPC_INVOCATIONS_THREADS, Config.DRPC_MAX_BUFFER_SIZE, null, WorkerTokenServiceType.DRPC, false),
Config.DRPC_WORKER_THREADS, Config.DRPC_MAX_BUFFER_SIZE, null, null, false), DRPC_INVOCATIONS(
Config.DRPC_INVOCATIONS_THRIFT_TRANSPORT_PLUGIN, Config.DRPC_INVOCATIONS_PORT, null, Config.DRPC_INVOCATIONS_THREADS,
Config.DRPC_MAX_BUFFER_SIZE, Config.TOPOLOGY_DRPC_INVOCATIONS_THRIFT_SOCKET_TIMEOUT_MS, WorkerTokenServiceType.DRPC,
false),
LOCAL_FAKE;

private final String transConf;
Expand Down