Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TUBEMQ-94] Substitute the parameterized type for core module #75

Merged
merged 1 commit into from
May 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class ConsumerEvent {
private EventType type;
private EventStatus status;
private List<SubscribeInfo> subscribeInfoList =
new ArrayList<SubscribeInfo>();
new ArrayList<>();


public ConsumerEvent(long rebalanceId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public ConsumerInfo(String consumerId,
this.topicSet = topicSet;
if (topicConditions == null) {
this.topicConditions =
new HashMap<String, TreeSet<String>>();
new HashMap<>();
} else {
this.topicConditions = topicConditions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
public class MasterInfo {

private final Map<String/** ip:port */, NodeAddrInfo> addrMap4Failover =
new HashMap<String, NodeAddrInfo>();
new HashMap<>();
private List<String> nodeHostPortList;
private NodeAddrInfo firstNodeAddr = null;
private String masterClusterStr;
Expand Down Expand Up @@ -78,7 +78,7 @@ public MasterInfo(final String masterAddrInfo) {
this.firstNodeAddr = tmpNodeAddrInfo;
}
}
nodeHostPortList = new ArrayList<String>(addrMap4Failover.size());
nodeHostPortList = new ArrayList<>(addrMap4Failover.size());
nodeHostPortList.addAll(addrMap4Failover.keySet());
int count = 0;
Collections.sort(nodeHostPortList);
Expand All @@ -100,7 +100,7 @@ private MasterInfo(Map<String, NodeAddrInfo> addressMap4Failover,
}
this.addrMap4Failover.put(entry.getKey(), entry.getValue());
}
this.nodeHostPortList = new ArrayList<String>(addrMap4Failover.size());
this.nodeHostPortList = new ArrayList<>(addrMap4Failover.size());
this.nodeHostPortList.addAll(addrMap4Failover.keySet());
this.firstNodeAddr = firstNodeAddr;
this.masterClusterStr = masterClusterStr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public class FlowCtrlRuleHandler {
System.currentTimeMillis();
// Decoded flow control rules
private Map<Integer, List<FlowCtrlItem>> flowCtrlRuleSet =
new ConcurrentHashMap<Integer, List<FlowCtrlItem>>();
new ConcurrentHashMap<>();

public FlowCtrlRuleHandler(boolean isDefault) {
this.isDefaultHandler = isDefault;
Expand Down Expand Up @@ -397,7 +397,7 @@ public void clear() {
*/
public Map<Integer, List<FlowCtrlItem>> parseFlowCtrlInfo(final String flowCtrlInfo)
throws Exception {
Map<Integer, List<FlowCtrlItem>> flowCtrlMap = new ConcurrentHashMap<Integer, List<FlowCtrlItem>>();
Map<Integer, List<FlowCtrlItem>> flowCtrlMap = new ConcurrentHashMap<>();
if (TStringUtils.isBlank(flowCtrlInfo)) {
throw new Exception("Parsing error, flowCtrlInfo value is blank!");
}
Expand Down Expand Up @@ -470,7 +470,7 @@ private List<FlowCtrlItem> parseDataLimit(int typeVal, JsonObject jsonObject) th
if (ruleArray == null) {
throw new Exception("not found rule list in data limit!");
}
ArrayList<FlowCtrlItem> flowCtrlItems = new ArrayList<FlowCtrlItem>();
ArrayList<FlowCtrlItem> flowCtrlItems = new ArrayList<>();
for (int index = 0; index < ruleArray.size(); index++) {
JsonObject ruleObject = ruleArray.get(index).getAsJsonObject();
int startTime = validAndGetTimeValue("start",
Expand Down Expand Up @@ -550,7 +550,7 @@ private List<FlowCtrlItem> parseFreqLimit(int typeVal,
if (ruleArray == null) {
throw new Exception("not found rule list in freq limit!");
}
ArrayList<FlowCtrlItem> flowCtrlItems = new ArrayList<FlowCtrlItem>();
ArrayList<FlowCtrlItem> flowCtrlItems = new ArrayList<>();
for (int index = 0; index < ruleArray.size(); index++) {
JsonObject ruleObject = ruleArray.get(index).getAsJsonObject();
if (!ruleObject.has("zeroCnt")) {
Expand Down Expand Up @@ -611,7 +611,7 @@ private List<FlowCtrlItem> parseLowFetchLimit(int typeVal,
if (ruleArray.size() > 1) {
throw new Exception("only allow set one rule in low fetch limit!");
}
ArrayList<FlowCtrlItem> flowCtrlItems = new ArrayList<FlowCtrlItem>();
ArrayList<FlowCtrlItem> flowCtrlItems = new ArrayList<>();
for (int index = 0; index < ruleArray.size(); index++) {
JsonObject ruleObject = ruleArray.get(index).getAsJsonObject();
int normfreqInMs = 0;
Expand Down Expand Up @@ -685,7 +685,7 @@ private List<FlowCtrlItem> parseSSDProcLimit(int typeVal,
if (ruleArray == null) {
throw new Exception("not found rule list in SSD limit!");
}
ArrayList<FlowCtrlItem> flowCtrlItems = new ArrayList<FlowCtrlItem>();
ArrayList<FlowCtrlItem> flowCtrlItems = new ArrayList<>();
for (int index = 0; index < ruleArray.size(); index++) {
JsonObject ruleObject = ruleArray.get(index).getAsJsonObject();
int startTime = validAndGetTimeValue("start",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ public class ConcurrentHashSet<E> extends MapBackedSet<E> {
private static final long serialVersionUID = 8518578988740277828L;

public ConcurrentHashSet() {
super(new ConcurrentHashMap<E, Boolean>());
super(new ConcurrentHashMap<>());
}

public ConcurrentHashSet(Collection<E> c) {
super(new ConcurrentHashMap<E, Boolean>(), c);
super(new ConcurrentHashMap<>(), c);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class DataConverterUtil {
* @param strSubInfoList return a list of SubscribeInfos
*/
public static List<SubscribeInfo> convertSubInfo(List<String> strSubInfoList) {
List<SubscribeInfo> subInfoList = new ArrayList<SubscribeInfo>();
List<SubscribeInfo> subInfoList = new ArrayList<>();
if (strSubInfoList != null) {
for (String strSubInfo : strSubInfoList) {
if (TStringUtils.isNotBlank(strSubInfo)) {
Expand All @@ -65,7 +65,7 @@ public static List<SubscribeInfo> convertSubInfo(List<String> strSubInfoList) {
* @param subInfoList return a list of String SubscribeInfos
*/
public static List<String> formatSubInfo(List<SubscribeInfo> subInfoList) {
List<String> strSubInfoList = new ArrayList<String>();
List<String> strSubInfoList = new ArrayList<>();
if ((subInfoList != null) && (!subInfoList.isEmpty())) {
for (SubscribeInfo subInfo : subInfoList) {
if (subInfo != null) {
Expand All @@ -83,7 +83,7 @@ public static List<String> formatSubInfo(List<SubscribeInfo> subInfoList) {
* @return return a list of Partition
*/
public static List<Partition> convertPartitionInfo(List<String> strPartInfoList) {
List<Partition> partList = new ArrayList<Partition>();
List<Partition> partList = new ArrayList<>();
if (strPartInfoList != null) {
for (String partInfo : strPartInfoList) {
if (partInfo != null) {
Expand All @@ -102,7 +102,7 @@ public static List<Partition> convertPartitionInfo(List<String> strPartInfoList)
*/
public static List<TopicInfo> convertTopicInfo(Map<Integer, BrokerInfo> brokerInfoMap,
List<String> strTopicInfos) {
List<TopicInfo> topicList = new ArrayList<TopicInfo>();
List<TopicInfo> topicList = new ArrayList<>();
if (strTopicInfos != null) {
for (String info : strTopicInfos) {
if (info != null) {
Expand Down Expand Up @@ -133,7 +133,7 @@ public static List<TopicInfo> convertTopicInfo(Map<Integer, BrokerInfo> brokerIn
*/
public static Map<Integer, BrokerInfo> convertBrokerInfo(List<String> strBrokerInfos) {
Map<Integer, BrokerInfo> brokerInfoMap =
new ConcurrentHashMap<Integer, BrokerInfo>();
new ConcurrentHashMap<>();
if (strBrokerInfos != null) {
for (String info : strBrokerInfos) {
if (info != null) {
Expand All @@ -155,7 +155,7 @@ public static Map<Integer, BrokerInfo> convertBrokerInfo(List<String> strBrokerI
public static Map<String, TreeSet<String>> convertTopicConditions(
final List<String> strTopicConditions) {
Map<String, TreeSet<String>> topicConditions =
new HashMap<String, TreeSet<String>>();
new HashMap<>();
if (strTopicConditions == null || strTopicConditions.isEmpty()) {
return topicConditions;
}
Expand All @@ -172,7 +172,7 @@ public static Map<String, TreeSet<String>> convertTopicConditions(
String[] strCondInfo = strInfo[1].split(TokenConstants.ARRAY_SEP);
TreeSet<String> conditionSet = topicConditions.get(topicName);
if (conditionSet == null) {
conditionSet = new TreeSet<String>();
conditionSet = new TreeSet<>();
topicConditions.put(topicName, conditionSet);
}
for (String cond : strCondInfo) {
Expand All @@ -194,7 +194,7 @@ public static Map<String, TreeSet<String>> convertTopicConditions(
*/
public static List<Message> convertMessage(final String topicName,
List<ClientBroker.TransferedMessage> transferedMessageList) {
List<Message> messageList = new ArrayList<Message>();
List<Message> messageList = new ArrayList<>();
if (transferedMessageList == null || transferedMessageList.isEmpty()) {
return messageList;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

public class RpcConfig {

private final Map<String, Object> params = new HashMap<String, Object>();
private final Map<String, Object> params = new HashMap<>();

public RpcConfig() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,21 @@ public class RpcServiceFactory {
private static AtomicInteger threadIdGen = new AtomicInteger(0);
private final ClientFactory clientFactory;
private final ConcurrentHashMap<Integer, ServiceRpcServer> servers =
new ConcurrentHashMap<Integer, ServiceRpcServer>();
new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, ServiceHolder> servicesCache =
new ConcurrentHashMap<String, ServiceHolder>();
new ConcurrentHashMap<>();
private final ConcurrentHashMap<String/* addr */, RemoteConErrStats> remoteAddrMap =
new ConcurrentHashMap<String, RemoteConErrStats>();
new ConcurrentHashMap<>();
private final ConcurrentHashMap<String/* addr */, Long> forbiddenAddrMap =
new ConcurrentHashMap<String, Long>();
new ConcurrentHashMap<>();
private final ConnectionManager connectionManager;
private final ConcurrentHashMap<String, ConnectionNode> brokerQueue =
new ConcurrentHashMap<String, ConnectionNode>();
new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Long> updateTime =
new ConcurrentHashMap<String, Long>();
new ConcurrentHashMap<>();
// Temporary invalid broker map
private final ConcurrentHashMap<Integer, Long> brokerUnavailableMap =
new ConcurrentHashMap<Integer, Long>();
new ConcurrentHashMap<>();
private long unAvailableFbdDurationMs =
RpcConstants.CFG_UNAVAILABLE_FORBIDDEN_DURATION_MS;
private AtomicLong lastLogPrintTime = new AtomicLong(0);
Expand Down Expand Up @@ -197,7 +197,7 @@ public void addRmtAddrErrCount(String remoteAddr) {
if (beforeTime == null) {
int totalCount = 0;
Long curTime = System.currentTimeMillis();
Set<String> expiredAddrs = new HashSet<String>();
Set<String> expiredAddrs = new HashSet<>();
for (Map.Entry<String, Long> entry : forbiddenAddrMap.entrySet()) {
if (entry.getKey() == null || entry.getValue() == null) {
continue;
Expand Down Expand Up @@ -244,7 +244,7 @@ public void addRmtAddrErrCount(String remoteAddr) {

public void rmvAllExpiredRecords() {
long curTime = System.currentTimeMillis();
Set<String> expiredAddrs = new HashSet<String>();
Set<String> expiredAddrs = new HashSet<>();
for (Map.Entry<String, RemoteConErrStats> entry : remoteAddrMap.entrySet()) {
if (entry.getKey() == null || entry.getValue() == null) {
continue;
Expand Down Expand Up @@ -294,7 +294,7 @@ public void addUnavailableBroker(int brokerId) {

public void rmvExpiredUnavailableBrokers() {
long curTime = System.currentTimeMillis();
Set<Integer> expiredBrokers = new HashSet<Integer>();
Set<Integer> expiredBrokers = new HashSet<>();
for (Map.Entry<Integer, Long> entry : brokerUnavailableMap.entrySet()) {
if (entry.getKey() == null || entry.getValue() == null) {
continue;
Expand Down Expand Up @@ -566,7 +566,7 @@ public void run() {
}
long cur = System.currentTimeMillis();
if (cur - lastCheckTime.get() >= 30000) {
ArrayList<String> tmpKeyList = new ArrayList<String>();
ArrayList<String> tmpKeyList = new ArrayList<>();
for (Map.Entry<String, Long> entry : updateTime.entrySet()) {
if (entry.getKey() == null || entry.getValue() == null) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
public class PbEnDecoder {
// The set of methods supported by RPC, only the methods in the map are accepted
private static final Map<String, Integer> rpcMethodMap =
new HashMap<String, Integer>();
new HashMap<>();
// The set of services supported by RPC, only the services in the map are processed.
private static final Map<String, Integer> rpcServiceMap =
new HashMap<String, Integer>();
new HashMap<>();

static {
// The MAP corresponding to the writing of these strings and constants when the system starts up
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void append(List<ByteBuffer> lists) {
}

public void reset() {
buffers = new LinkedList<ByteBuffer>();
buffers = new LinkedList<>();
buffers.add(ByteBuffer.allocate(RpcConstants.RPC_MAX_BUFFER_SIZE));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ public class NettyClient implements Client {
private static final AtomicLong init = new AtomicLong(0);
private static Timer timer;
private final ConcurrentHashMap<Integer, Callback<ResponseWrapper>> requests =
new ConcurrentHashMap<Integer, Callback<ResponseWrapper>>();
new ConcurrentHashMap<>();
private final ConcurrentHashMap<Integer, Timeout> timeouts =
new ConcurrentHashMap<Integer, Timeout>();
new ConcurrentHashMap<>();
private final AtomicInteger serialNoGenerator =
new AtomicInteger(0);
private AtomicBoolean released = new AtomicBoolean(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class NettyClientFactory implements ClientFactory {
private static final Logger logger =
LoggerFactory.getLogger(NettyClientFactory.class);
protected final ConcurrentHashMap<String, Client> clients =
new ConcurrentHashMap<String, Client>();
new ConcurrentHashMap<>();
protected AtomicBoolean shutdown = new AtomicBoolean(true);
private Timer timer = new HashedWheelTimer();
private volatile AtomicBoolean init = new AtomicBoolean(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ public class NettyProtocolDecoder extends FrameDecoder {
private static final Logger logger =
LoggerFactory.getLogger(NettyProtocolDecoder.class);
private static final ConcurrentHashMap<String, AtomicLong> errProtolAddrMap =
new ConcurrentHashMap<String, AtomicLong>();
new ConcurrentHashMap<>();
private static final ConcurrentHashMap<String, AtomicLong> errSizeAddrMap =
new ConcurrentHashMap<String, AtomicLong>();
new ConcurrentHashMap<>();
private static AtomicLong lastProtolTime = new AtomicLong(0);
private static AtomicLong lastSizeTime = new AtomicLong(0);
private boolean packHeaderRead = false;
Expand All @@ -61,7 +61,7 @@ protected Object decode(ChannelHandlerContext ctx, Channel channel,
filterIllegalPackageSize(true, tmpListSize,
RpcConstants.MAX_FRAME_MAX_LIST_SIZE, channel);
this.listSize = tmpListSize;
this.dataPack = new RpcDataPack(serialNo, new ArrayList<ByteBuffer>(this.listSize));
this.dataPack = new RpcDataPack(serialNo, new ArrayList<>(this.listSize));
this.packHeaderRead = true;
}
// get PackBody
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ protected Object encode(ChannelHandlerContext ctx,
Channel channel, Object msg) throws Exception {
RpcDataPack dataPack = (RpcDataPack) msg;
List<ByteBuffer> origs = dataPack.getDataLst();
List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(origs.size() * 2 + 1);
List<ByteBuffer> bbs = new ArrayList<>(origs.size() * 2 + 1);
bbs.add(getPackHeader(dataPack));
for (ByteBuffer b : origs) {
bbs.add(getLengthHeader(b));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ public class NettyRpcServer implements ServiceRpcServer {
private static final Logger logger =
LoggerFactory.getLogger(NettyRpcServer.class);
private static final ConcurrentHashMap<String, AtomicLong> errParseAddrMap =
new ConcurrentHashMap<String, AtomicLong>();
new ConcurrentHashMap<>();
private static AtomicLong lastParseTime = new AtomicLong(0);
private final ConcurrentHashMap<Integer, Protocol> protocols =
new ConcurrentHashMap<Integer, Protocol>();
new ConcurrentHashMap<>();
private ServerBootstrap bootstrap;
private NioServerSocketChannelFactory channelFactory = null;
private AtomicBoolean started = new AtomicBoolean(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
public class ProtocolFactory {

private static final Map<Integer, Class<? extends Protocol>> protocols =
new HashMap<Integer, Class<? extends Protocol>>();
new HashMap<>();

static {
registerProtocol(RpcProtocol.RPC_PROTOCOL_TCP, RpcProtocol.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ public class RpcProtocol implements Protocol {
private static final Logger logger =
LoggerFactory.getLogger(RpcProtocol.class);
private final Map<Integer, Object> processors =
new HashMap<Integer, Object>();
new HashMap<>();
private final Map<Integer, Method> cacheMethods =
new HashMap<Integer, Method>();
new HashMap<>();
private final Map<Integer, ExecutorService> threadPools =
new HashMap<Integer, ExecutorService>();
new HashMap<>();
private boolean isOverTLS = false;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ private static boolean subscribeInfoEqual(SubscribeInfo o1, SubscribeInfo o2) {
public void testDataConvert() {
// broker convert
BrokerInfo broker = new BrokerInfo(0, "localhost", 1200);
List<String> strInfoList = new ArrayList<String>();
List<String> strInfoList = new ArrayList<>();
strInfoList.add("0:localhost:1200");
Map<Integer, BrokerInfo> brokerMap = DataConverterUtil.convertBrokerInfo(strInfoList);
assertEquals("broker should be equal", broker, brokerMap.get(broker.getBrokerId()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void encode() {
RpcDataPack obj = new RpcDataPack();
// set serial number
obj.setSerialNo(123);
List<ByteBuffer> dataList = new LinkedList<ByteBuffer>();
List<ByteBuffer> dataList = new LinkedList<>();
dataList.add(ByteBuffer.wrap("abc".getBytes()));
dataList.add(ByteBuffer.wrap("def".getBytes()));
// append data list.
Expand Down