Skip to content

Commit

Permalink
fix(window) fix queuelist null pointer exception (#89)
Browse files Browse the repository at this point in the history
  • Loading branch information
duhenglucky committed Dec 1, 2021
1 parent 27c8b26 commit f288ad4
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public void doProcessAfterRefreshConfigurable(IConfigurableService configurableS
this.windowFireSource.init();
this.windowFireSource.start(getFireReceiver());
this.shuffleChannel = new ShuffleChannel(this);
this.shuffleChannel.init();
windowCache.setBatchSize(5000);
windowCache.setShuffleChannel(shuffleChannel);
}
Expand All @@ -74,14 +75,15 @@ public int fireWindowInstance(WindowInstance windowInstance, Map<String, String>
* @param messages
* @param instance
*/
public abstract void shuffleCalculate(List<IMessage> messages, WindowInstance instance, String queueId);
public abstract void shuffleCalculate(List<IMessage> messages, WindowInstance instance, String queueId);

/**
/**
* 触发window
*
* @param instance
*/
protected abstract int fireWindowInstance(WindowInstance instance, String queueId, Map<String, String> queueId2Offset);
protected abstract int fireWindowInstance(WindowInstance instance, String queueId,
Map<String, String> queueId2Offset);

public abstract void clearCache(String queueId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;


import java.util.concurrent.atomic.AtomicLong;

import org.apache.rocketmq.streams.common.channel.sink.AbstractSupportShuffleSink;
import org.apache.rocketmq.streams.common.channel.source.AbstractSource;
import org.apache.rocketmq.streams.common.channel.source.systemmsg.NewSplitMessage;
Expand All @@ -34,33 +38,28 @@
import org.apache.rocketmq.streams.common.checkpoint.CheckPointMessage;
import org.apache.rocketmq.streams.common.checkpoint.CheckPointState;
import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
import org.apache.rocketmq.streams.common.context.AbstractContext;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.context.Message;
import org.apache.rocketmq.streams.common.context.MessageOffset;
import org.apache.rocketmq.streams.common.interfaces.ISystemMessage;
import org.apache.rocketmq.streams.common.topology.ChainPipeline;
import org.apache.rocketmq.streams.common.topology.model.Pipeline;
import org.apache.rocketmq.streams.common.utils.CollectionUtil;
import org.apache.rocketmq.streams.common.utils.DateUtil;
import org.apache.rocketmq.streams.common.utils.FileUtil;
import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.apache.rocketmq.streams.common.utils.TraceUtil;
import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
import org.apache.rocketmq.streams.window.debug.DebugWriter;
import org.apache.rocketmq.streams.window.model.WindowCache;
import org.apache.rocketmq.streams.window.model.WindowInstance;
import org.apache.rocketmq.streams.window.operator.AbstractShuffleWindow;
import org.apache.rocketmq.streams.window.operator.AbstractWindow;
import org.apache.rocketmq.streams.common.context.AbstractContext;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.context.Message;
import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.apache.rocketmq.streams.window.model.WindowInstance;
import org.apache.rocketmq.streams.window.model.WindowCache;
import org.apache.rocketmq.streams.window.operator.impl.WindowOperator.WindowRowOperator;
import org.apache.rocketmq.streams.window.sqlcache.impl.SQLElement;
import org.apache.rocketmq.streams.window.storage.ShufflePartitionManager;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

/**
* 负责处理分片
*/
Expand All @@ -73,12 +72,10 @@ public class ShuffleChannel extends AbstractSystemChannel {
protected static final String SHUFFLE_MESSAGES = "SHUFFLE_MESSAGES";
protected String MSG_OWNER = "MSG_OWNER";//消息所属的window


private static final String SHUFFLE_TRACE_ID = "SHUFFLE_TRACE_ID";

protected ShuffleCache shuffleCache;


protected Map<String, ISplit> queueMap = new ConcurrentHashMap<>();
protected List<ISplit> queueList;//所有的分片

Expand All @@ -97,18 +94,16 @@ public ShuffleChannel(AbstractShuffleWindow window) {
channelConfig.put(CHANNEL_PROPERTY_KEY_PREFIX, ConfigureFileKey.WINDOW_SHUFFLE_CHANNEL_PROPERTY_PREFIX);
channelConfig.put(CHANNEL_TYPE, ConfigureFileKey.WINDOW_SHUFFLE_CHANNEL_TYPE);


this.shuffleCache = new ShuffleCache(window);
this.shuffleCache.init();
this.shuffleCache.openAutoFlush();


}

protected transient AtomicBoolean hasStart=new AtomicBoolean(false);
protected transient AtomicBoolean hasStart = new AtomicBoolean(false);

@Override public void startChannel() {
if(hasStart.compareAndSet(false,true)){
init();
if (hasStart.compareAndSet(false, true)) {
super.startChannel();
}

Expand All @@ -117,7 +112,7 @@ public ShuffleChannel(AbstractShuffleWindow window) {
/**
* init shuffle channel
*/
public void init(){
public void init() {
this.consumer = createSource(window.getNameSpace(), window.getConfigureName());

this.producer = createSink(window.getNameSpace(), window.getConfigureName());
Expand Down Expand Up @@ -205,7 +200,6 @@ public Object doMessage(IMessage oriMessage, AbstractContext context) {
DebugWriter.getDebugWriter(window.getConfigureName()).writeShuffleReceiveBeforeCache(window, msgs, queueId);
}


beforeBatchAdd(oriMessage, message);

for (WindowInstance windowInstance : windowInstances) {
Expand Down Expand Up @@ -234,7 +228,6 @@ public void addNewSplit(IMessage message, AbstractContext context, NewSplitMessa
window.initWindowInstanceMaxSplitNum(windowInstance);
}


} else {
for (String queueId : newSplitMessage.getSplitIds()) {
ShufflePartitionManager.getInstance().setSplitFinished(queueId);
Expand Down Expand Up @@ -304,7 +297,6 @@ protected void doSystemMessage(IMessage oriMessage, AbstractContext context) {
afterFlushCallback(oriMessage, context);
}


/**
* if the message offset is old filter the repeate message
*
Expand Down Expand Up @@ -379,7 +371,6 @@ protected String createShuffleChannelNameSpace(ChainPipeline pipeline) {
return pipeline.getSource().getNameSpace();
}


@Override
public String getConfigureName() {
return window.getConfigureName() + "_shuffle";
Expand All @@ -395,7 +386,6 @@ public String getType() {
return Pipeline.TYPE;
}


public ISplit getSplit(Integer index) {
return queueList.get(index);
}
Expand Down

0 comments on commit f288ad4

Please sign in to comment.