Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TUBEMQ-91] replace explicit type with <> #71

Merged
merged 1 commit into from
May 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ public class BaseMessageConsumer implements MessageConsumer {
private final ScheduledExecutorService heartService2Master;
private final Thread rebalanceThread;
private final BlockingQueue<ConsumerEvent> rebalanceEvents =
new ArrayBlockingQueue<ConsumerEvent>(REBALANCE_QUEUE_SIZE);
new ArrayBlockingQueue<>(REBALANCE_QUEUE_SIZE);
private final BlockingQueue<ConsumerEvent> rebalanceResults =
new ArrayBlockingQueue<ConsumerEvent>(REBALANCE_QUEUE_SIZE);
new ArrayBlockingQueue<>(REBALANCE_QUEUE_SIZE);
// flowctrl
private boolean isCurGroupCtrl = false;
private AtomicLong lastCheckTime = new AtomicLong(0);
Expand All @@ -102,7 +102,7 @@ public class BaseMessageConsumer implements MessageConsumer {
private final RpcConfig rpcConfig = new RpcConfig();
private AtomicLong visitToken = new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
private AtomicReference<String> authAuthorizedTokenRef =
new AtomicReference<String>("");
new AtomicReference<>("");
private ClientAuthenticateHandler authenticateHandler =
new SimpleClientAuthenticateHandler();
private Thread heartBeatThread2Broker;
Expand Down Expand Up @@ -615,9 +615,9 @@ private void startMasterAndBrokerThreads() throws TubeClientException {
}

private void disconnectFromBroker(ConsumerEvent event) throws InterruptedException {
List<String> partKeys = new ArrayList<String>();
List<String> partKeys = new ArrayList<>();
HashMap<BrokerInfo, List<Partition>> unRegisterInfoMap =
new HashMap<BrokerInfo, List<Partition>>();
new HashMap<>();
List<SubscribeInfo> subscribeInfoList = event.getSubscribeInfoList();
for (SubscribeInfo info : subscribeInfoList) {
BrokerInfo broker =
Expand All @@ -627,7 +627,7 @@ private void disconnectFromBroker(ConsumerEvent event) throws InterruptedExcepti
List<Partition> unRegisterPartitionList =
unRegisterInfoMap.get(broker);
if (unRegisterPartitionList == null) {
unRegisterPartitionList = new ArrayList<Partition>();
unRegisterPartitionList = new ArrayList<>();
unRegisterInfoMap.put(broker, unRegisterPartitionList);
}
if (!unRegisterPartitionList.contains(partition)) {
Expand All @@ -639,7 +639,7 @@ private void disconnectFromBroker(ConsumerEvent event) throws InterruptedExcepti
return;
}
Map<BrokerInfo, List<PartitionSelectResult>> unNewRegisterInfoMap =
new HashMap<BrokerInfo, List<PartitionSelectResult>>();
new HashMap<>();
try {
if (this.isPullConsume) {
unNewRegisterInfoMap =
Expand All @@ -661,14 +661,14 @@ private void disconnectFromBroker(ConsumerEvent event) throws InterruptedExcepti

private void connect2Broker(ConsumerEvent event) throws InterruptedException {
Map<BrokerInfo, List<Partition>> registerInfoMap =
new HashMap<BrokerInfo, List<Partition>>();
new HashMap<>();
List<SubscribeInfo> subscribeInfoList = event.getSubscribeInfoList();
for (SubscribeInfo info : subscribeInfoList) {
BrokerInfo broker = new BrokerInfo(info.getBrokerId(), info.getHost(), info.getPort());
Partition partition = new Partition(broker, info.getTopic(), info.getPartitionId());
List<Partition> curPartList = registerInfoMap.get(broker);
if (curPartList == null) {
curPartList = new ArrayList<Partition>();
curPartList = new ArrayList<>();
registerInfoMap.put(broker, curPartList);
}
if (!curPartList.contains(partition)) {
Expand All @@ -678,7 +678,7 @@ private void connect2Broker(ConsumerEvent event) throws InterruptedException {
if ((isRebalanceStopped()) || (isShutdown())) {
return;
}
List<Partition> unfinishedPartitions = new ArrayList<Partition>();
List<Partition> unfinishedPartitions = new ArrayList<>();
rmtDataCache.filterCachedPartitionInfo(registerInfoMap, unfinishedPartitions);
registerPartitions(registerInfoMap, unfinishedPartitions);
if (this.isFirst.get()) {
Expand Down Expand Up @@ -913,7 +913,7 @@ private ClientMaster.RegisterRequestC2M createMasterRegisterRequest() throws Exc
private List<String> formatTopicCondInfo(
final ConcurrentHashMap<String, TopicProcessor> topicCondMap) {
final StringBuilder strBuffer = new StringBuilder(512);
List<String> strTopicCondList = new ArrayList<String>();
List<String> strTopicCondList = new ArrayList<>();
if ((topicCondMap != null) && (!topicCondMap.isEmpty())) {
for (Map.Entry<String, TopicProcessor> entry : topicCondMap.entrySet()) {
if (entry.getKey() == null || entry.getValue() == null) {
Expand Down Expand Up @@ -1253,7 +1253,7 @@ protected FetchContext fetchMessage(PartitionSelectResult partSelectResult,
needFilter = true;
}
}
List<Message> messageList = new ArrayList<Message>();
List<Message> messageList = new ArrayList<>();
for (Message message : tmpMessageList) {
if (message == null) {
continue;
Expand Down Expand Up @@ -1584,7 +1584,7 @@ public void run() {
}
// Send heartbeat request to the broker connect by the client
for (BrokerInfo brokerInfo : rmtDataCache.getAllRegisterBrokers()) {
List<String> partStrSet = new ArrayList<String>();
List<String> partStrSet = new ArrayList<>();
try {
// Handle the heartbeat response for partitions belong to the same broker.
List<Partition> partitions =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

public class ClientSubInfo {
private final ConcurrentHashMap<String/* topic */, TopicProcessor> topicCondRegistry =
new ConcurrentHashMap<String, TopicProcessor>();
new ConcurrentHashMap<>();
private boolean requireBound = false;
private AtomicBoolean isNotAllocated =
new AtomicBoolean(true);
Expand All @@ -37,9 +37,9 @@ public class ClientSubInfo {
private long subscribedTime;
private boolean isSelectBig = true;
private String requiredPartition = "";
private Set<String> subscribedTopics = new HashSet<String>();
private Map<String, Long> assignedPartMap = new HashMap<String, Long>();
private Map<String, Boolean> topicFilterMap = new HashMap<String, Boolean>();
private Set<String> subscribedTopics = new HashSet<>();
private Map<String, Long> assignedPartMap = new HashMap<>();
private Map<String, Boolean> topicFilterMap = new HashMap<>();

public ClientSubInfo() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class ConsumerResult {
private String topicName = "";
private PeerInfo peerInfo = new PeerInfo();
private String confirmContext = "";
private List<Message> messageList = new ArrayList<Message>();
private List<Message> messageList = new ArrayList<>();


public ConsumerResult(int errCode, String errMsg) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class FetchContext {
private String errMsg = "";
private long currOffset = TBaseConstants.META_VALUE_UNDEFINED;
private String confirmContext = "";
private List<Message> messageList = new ArrayList<Message>();
private List<Message> messageList = new ArrayList<>();

public FetchContext(PartitionSelectResult selectResult) {
this.partition = selectResult.getPartition();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class MessageFetchManager {
private static final Logger logger =
LoggerFactory.getLogger(MessageFetchManager.class);
private final ConcurrentHashMap<Long, Integer> fetchWorkerStatusMap =
new ConcurrentHashMap<Long, Integer>();
new ConcurrentHashMap<>();
private final ConsumerConfig consumerConfig;
private final SimplePushMessageConsumer pushConsumer;
// Manager status:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,21 +56,21 @@ public class RmtDataCache implements Closeable {
private final FlowCtrlRuleHandler defFlowCtrlRuleHandler;
private final AtomicInteger waitCont = new AtomicInteger(0);
private final ConcurrentHashMap<String, Timeout> timeouts =
new ConcurrentHashMap<String, Timeout>();
new ConcurrentHashMap<>();
private final BlockingQueue<String> indexPartition =
new LinkedBlockingQueue<String>();
new LinkedBlockingQueue<>();
private final ConcurrentHashMap<String /* index */, PartitionExt> partitionMap =
new ConcurrentHashMap<String, PartitionExt>();
new ConcurrentHashMap<>();
private final ConcurrentHashMap<String /* index */, Long> partitionUsedMap =
new ConcurrentHashMap<String, Long>();
new ConcurrentHashMap<>();
private final ConcurrentHashMap<String /* index */, Long> partitionOffsetMap =
new ConcurrentHashMap<String, Long>();
new ConcurrentHashMap<>();
private final ConcurrentHashMap<String /* topic */, ConcurrentLinkedQueue<Partition>> topicPartitionConMap =
new ConcurrentHashMap<String, ConcurrentLinkedQueue<Partition>>();
new ConcurrentHashMap<>();
private final ConcurrentHashMap<BrokerInfo/* broker */, ConcurrentLinkedQueue<Partition>> brokerPartitionConMap =
new ConcurrentHashMap<BrokerInfo, ConcurrentLinkedQueue<Partition>>();
new ConcurrentHashMap<>();
private final ConcurrentHashMap<String/* partitionKey */, Integer> partRegisterBookMap =
new ConcurrentHashMap<String/* partitionKey */, Integer>();
new ConcurrentHashMap<>();
private AtomicBoolean isClosed = new AtomicBoolean(false);
private CountDownLatch dataProcessSync = new CountDownLatch(0);

Expand All @@ -90,7 +90,7 @@ public RmtDataCache(final FlowCtrlRuleHandler defFlowCtrlRuleHandler,
}
this.defFlowCtrlRuleHandler = defFlowCtrlRuleHandler;
this.groupFlowCtrlRuleHandler = groupFlowCtrlRuleHandler;
Map<Partition, Long> tmpPartOffsetMap = new HashMap<Partition, Long>();
Map<Partition, Long> tmpPartOffsetMap = new HashMap<>();
if (partitionList != null) {
for (Partition partition : partitionList) {
tmpPartOffsetMap.put(partition, -1L);
Expand Down Expand Up @@ -294,7 +294,7 @@ public void addPartition(Partition partition, long currOffset) {
if (partition == null) {
return;
}
Map<Partition, Long> tmpPartOffsetMap = new HashMap<Partition, Long>();
Map<Partition, Long> tmpPartOffsetMap = new HashMap<>();
tmpPartOffsetMap.put(partition, currOffset);
addPartitionsInfo(tmpPartOffsetMap);
}
Expand Down Expand Up @@ -429,7 +429,7 @@ public void close() {
* @return subscribe information list
*/
public List<SubscribeInfo> getSubscribeInfoList(String consumerId, String consumeGroup) {
List<SubscribeInfo> subscribeInfoList = new ArrayList<SubscribeInfo>();
List<SubscribeInfo> subscribeInfoList = new ArrayList<>();
for (Partition partition : partitionMap.values()) {
if (partition != null) {
subscribeInfoList.add(new SubscribeInfo(consumerId, consumeGroup, partition));
Expand All @@ -444,7 +444,7 @@ public Map<BrokerInfo, List<PartitionSelectResult>> removeAndGetPartition(
boolean isWaitTimeoutRollBack) {
StringBuilder sBuilder = new StringBuilder(512);
HashMap<BrokerInfo, List<PartitionSelectResult>> unNewRegisterInfoMap =
new HashMap<BrokerInfo, List<PartitionSelectResult>>();
new HashMap<>();
pauseProcess();
try {
waitPartitions(partitionKeys, inUseWaitPeriodMs);
Expand Down Expand Up @@ -491,7 +491,7 @@ public Map<BrokerInfo, List<PartitionSelectResult>> removeAndGetPartition(
List<PartitionSelectResult> targetPartitonList =
unNewRegisterInfoMap.get(entry.getKey());
if (targetPartitonList == null) {
targetPartitonList = new ArrayList<PartitionSelectResult>();
targetPartitonList = new ArrayList<>();
unNewRegisterInfoMap.put(entry.getKey(), targetPartitonList);
}
targetPartitonList.add(partitionRet);
Expand Down Expand Up @@ -540,7 +540,7 @@ public void removePartition(Partition partition) {
*/
public Map<String, ConsumeOffsetInfo> getCurPartitionInfoMap() {
Map<String, ConsumeOffsetInfo> tmpPartitionMap =
new ConcurrentHashMap<String, ConsumeOffsetInfo>();
new ConcurrentHashMap<>();
for (Map.Entry<String, PartitionExt> entry : partitionMap.entrySet()) {
if (entry.getKey() == null || entry.getValue() == null) {
continue;
Expand All @@ -553,12 +553,12 @@ public Map<String, ConsumeOffsetInfo> getCurPartitionInfoMap() {

public Map<BrokerInfo, List<PartitionSelectResult>> getAllPartitionListWithStatus() {
Map<BrokerInfo, List<PartitionSelectResult>> registeredInfoMap =
new HashMap<BrokerInfo, List<PartitionSelectResult>>();
new HashMap<>();
for (PartitionExt partitionExt : partitionMap.values()) {
List<PartitionSelectResult> registerPartitionList =
registeredInfoMap.get(partitionExt.getBroker());
if (registerPartitionList == null) {
registerPartitionList = new ArrayList<PartitionSelectResult>();
registerPartitionList = new ArrayList<>();
registeredInfoMap.put(partitionExt.getBroker(), registerPartitionList);
}
registerPartitionList.add(new PartitionSelectResult(true,
Expand All @@ -584,7 +584,7 @@ public Set<BrokerInfo> getAllRegisterBrokers() {
* @return partition list
*/
public List<Partition> getBrokerPartitionList(BrokerInfo brokerInfo) {
List<Partition> retPartition = new ArrayList<Partition>();
List<Partition> retPartition = new ArrayList<>();
ConcurrentLinkedQueue<Partition> partitionList =
brokerPartitionConMap.get(brokerInfo);
if (partitionList != null) {
Expand All @@ -595,7 +595,7 @@ public List<Partition> getBrokerPartitionList(BrokerInfo brokerInfo) {

public void filterCachedPartitionInfo(Map<BrokerInfo, List<Partition>> registerInfoMap,
List<Partition> unRegPartitionList) {
List<BrokerInfo> brokerInfoList = new ArrayList<BrokerInfo>();
List<BrokerInfo> brokerInfoList = new ArrayList<>();
for (Map.Entry<BrokerInfo, List<Partition>> entry : registerInfoMap.entrySet()) {
if (entry.getKey() == null || entry.getValue() == null) {
continue;
Expand Down Expand Up @@ -628,7 +628,7 @@ public ConcurrentLinkedQueue<Partition> getPartitionByBroker(BrokerInfo brokerIn

public void resumeTimeoutConsumePartitions(long allowedPeriodTimes) {
if (!partitionUsedMap.isEmpty()) {
List<String> partKeys = new ArrayList<String>();
List<String> partKeys = new ArrayList<>();
partKeys.addAll(partitionUsedMap.keySet());
for (String keyId : partKeys) {
Long oldTime = partitionUsedMap.get(keyId);
Expand Down Expand Up @@ -689,7 +689,7 @@ private void addPartitionsInfo(Map<Partition, Long> partOffsetMap) {
ConcurrentLinkedQueue<Partition> topicPartitionQue =
topicPartitionConMap.get(partition.getTopic());
if (topicPartitionQue == null) {
topicPartitionQue = new ConcurrentLinkedQueue<Partition>();
topicPartitionQue = new ConcurrentLinkedQueue<>();
ConcurrentLinkedQueue<Partition> tmpTopicPartitionQue =
topicPartitionConMap.putIfAbsent(partition.getTopic(), topicPartitionQue);
if (tmpTopicPartitionQue != null) {
Expand All @@ -702,7 +702,7 @@ private void addPartitionsInfo(Map<Partition, Long> partOffsetMap) {
ConcurrentLinkedQueue<Partition> brokerPartitionQue =
brokerPartitionConMap.get(partition.getBroker());
if (brokerPartitionQue == null) {
brokerPartitionQue = new ConcurrentLinkedQueue<Partition>();
brokerPartitionQue = new ConcurrentLinkedQueue<>();
ConcurrentLinkedQueue<Partition> tmpBrokerPartQues =
brokerPartitionConMap.putIfAbsent(partition.getBroker(), brokerPartitionQue);
if (tmpBrokerPartQues != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
public class TopicProcessor {
private MessageListener messageListener;
private TreeSet<String> filterCondStrs;
private List<Integer> filterCondCodes = new ArrayList<Integer>();
private List<Integer> filterCondCodes = new ArrayList<>();

public TopicProcessor(final MessageListener messageListener,
final TreeSet<String> filterConds) {
Expand Down