Skip to content

Commit

Permalink
Merge branch 'STORM-3148' of https://github.com/revans2/incubator-storm
Browse files Browse the repository at this point in the history
… into STORM-3148

STORM-3148: Avoid threading issues with kryo

This closes #2762
  • Loading branch information
Robert Evans committed Jul 13, 2018
2 parents e21110d + bfb267e commit c9e9a7c
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 24 deletions.
Expand Up @@ -81,11 +81,14 @@ public BackPressureStatus getCurrStatus() {
ArrayList<Integer> nonBpTasks = new ArrayList<>(tasks.size());

for (Entry<Integer, BackpressureState> entry : tasks.entrySet()) {
boolean backpressure = entry.getValue().backpressure.get();
if (backpressure) {
bpTasks.add(entry.getKey());
} else {
nonBpTasks.add(entry.getKey());
//System bolt is not a part of backpressure.
if (entry.getKey() >= 0) {
boolean backpressure = entry.getValue().backpressure.get();
if (backpressure) {
bpTasks.add(entry.getKey());
} else {
nonBpTasks.add(entry.getKey());
}
}
}
return new BackPressureStatus(workerId, bpTasks, nonBpTasks);
Expand Down
Expand Up @@ -118,8 +118,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
private final MessageBuffer batcher;
// wait strategy when the netty channel is not writable
private final IWaitStrategy waitStrategy;
KryoValuesSerializer ser;
KryoValuesDeserializer deser;
private volatile Map<Integer, Double> serverLoad = null;
/**
* This flag is set to true if and only if a client instance is being closed.
Expand Down Expand Up @@ -169,8 +167,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
waitStrategy = ReflectionUtils.newInstance(clazz);
}
waitStrategy.prepare(topoConf, WAIT_SITUATION.BACK_PRESSURE_WAIT);
ser = new KryoValuesSerializer(topoConf);
deser = new KryoValuesDeserializer(topoConf);
}

/**
Expand Down
Expand Up @@ -62,7 +62,6 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
private final int port;
private final ChannelGroup allChannels = new DefaultChannelGroup("storm-server", GlobalEventExecutor.INSTANCE);
private final KryoValuesSerializer ser;
private final KryoValuesDeserializer deser;
private volatile boolean closing = false;
private IConnectionCallback cb = null;
private Supplier<Object> newConnectionResponse;
Expand All @@ -71,7 +70,6 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
this.topoConf = topoConf;
this.port = port;
ser = new KryoValuesSerializer(topoConf);
deser = new KryoValuesDeserializer(topoConf);

// Configure the server.
int buffer_size = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
Expand All @@ -97,7 +95,7 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
.childOption(ChannelOption.SO_RCVBUF, buffer_size)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new StormServerPipelineFactory(ser, deser, topoConf, this));
.childHandler(new StormServerPipelineFactory(topoConf, this));

// Bind and start to accept incoming connections.
try {
Expand Down Expand Up @@ -171,7 +169,9 @@ public void close() {
@Override
public void sendLoadMetrics(Map<Integer, Double> taskToLoad) {
MessageBatch mb = new MessageBatch(1);
mb.add(new TaskMessage(LOAD_METRICS_TASK_ID, ser.serialize(Collections.singletonList((Object) taskToLoad))));
synchronized (ser) {
mb.add(new TaskMessage(LOAD_METRICS_TASK_ID, ser.serialize(Collections.singletonList((Object) taskToLoad))));
}
allChannels.writeAndFlush(mb);
}

Expand Down
Expand Up @@ -47,12 +47,24 @@ public void channelRead(ChannelHandlerContext ctx, Object message) throws Except
BackPressureStatus status = (BackPressureStatus) message;
if (status.bpTasks != null) {
for (Integer bpTask : status.bpTasks) {
remoteBpStatus[bpTask].set(true);
try {
remoteBpStatus[bpTask].set(true);
} catch (ArrayIndexOutOfBoundsException e) {
//Just in case we get something we are confused about
// we can continue processing the rest of the tasks
LOG.error("BP index out of bounds {}", e);
}
}
}
if (status.nonBpTasks != null) {
for (Integer bpTask : status.nonBpTasks) {
remoteBpStatus[bpTask].set(false);
try {
remoteBpStatus[bpTask].set(false);
} catch (ArrayIndexOutOfBoundsException e) {
//Just in case we get something we are confused about
// we can continue processing the rest of the tasks
LOG.error("BP index out of bounds {}", e);
}
}
}
LOG.debug("Received BackPressure status update : {}", status);
Expand Down
Expand Up @@ -15,6 +15,7 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.storm.Config;
import org.apache.storm.serialization.KryoValuesDeserializer;
import org.apache.storm.shade.io.netty.channel.Channel;
import org.apache.storm.shade.io.netty.channel.ChannelInitializer;
import org.apache.storm.shade.io.netty.channel.ChannelPipeline;
Expand All @@ -36,7 +37,7 @@ protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();

// Decoder
pipeline.addLast("decoder", new MessageDecoder(client.deser));
pipeline.addLast("decoder", new MessageDecoder(new KryoValuesDeserializer(conf)));
// Encoder
pipeline.addLast("encoder", NettySerializableMessageEncoder.INSTANCE);

Expand Down
Expand Up @@ -23,15 +23,10 @@

class StormServerPipelineFactory extends ChannelInitializer<Channel> {

private final KryoValuesSerializer ser;
private final KryoValuesDeserializer deser;
private final Map<String, Object> topoConf;
private final Server server;

StormServerPipelineFactory(KryoValuesSerializer ser, KryoValuesDeserializer deser,
Map<String, Object> topoConf, Server server) {
this.ser = ser;
this.deser = deser;
StormServerPipelineFactory(Map<String, Object> topoConf, Server server) {
this.topoConf = topoConf;
this.server = server;
}
Expand All @@ -42,10 +37,10 @@ protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();

// Decoder
pipeline.addLast("decoder", new MessageDecoder(deser));
pipeline.addLast("decoder", new MessageDecoder(new KryoValuesDeserializer(topoConf)));
// Encoders
pipeline.addLast("netty-serializable-encoder", NettySerializableMessageEncoder.INSTANCE);
pipeline.addLast("backpressure-encoder", new BackPressureStatusEncoder(ser));
pipeline.addLast("backpressure-encoder", new BackPressureStatusEncoder(new KryoValuesSerializer(topoConf)));

boolean isNettyAuth = (Boolean) topoConf
.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION);
Expand Down

0 comments on commit c9e9a7c

Please sign in to comment.