Skip to content

Commit

Permalink
[ISSUE #8201] Optimization of Splitting TimeRequest
Browse files Browse the repository at this point in the history
  • Loading branch information
CLFutureX committed May 25, 2024
1 parent 97b59a0 commit 0f74efa
Showing 1 changed file with 8 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -968,7 +968,6 @@ public int dequeue() throws Exception {
splitAndCheckLatchForDequeue(deleteMsgStack);
//read the normal msg
splitAndCheckLatchForDequeue(normalMsgStack);

// if master -> slave -> master, then the read time move forward, and messages will be lossed
if (dequeueStatusChangeFlag) {
return -1;
Expand All @@ -987,7 +986,7 @@ public int dequeue() throws Exception {
}

private void splitAndCheckLatchForDequeue(List<TimerRequest> origin) throws Exception {
if(origin.size() == 0){
if (origin.size() == 0) {
return;
}
CountDownLatch latch = new CountDownLatch(origin.size());
Expand All @@ -997,26 +996,25 @@ private void splitAndCheckLatchForDequeue(List<TimerRequest> origin) throws Exce
checkDequeueLatch(latch, currReadTimeMs);
}

private void splitIntoQueue(List<TimerRequest> origin,BlockingQueue<List<TimerRequest>> queue, CountDownLatch latch) {
if(origin.size() == 0){
return;
}
private void splitIntoQueue(List<TimerRequest> origin, BlockingQueue<List<TimerRequest>> queue,
CountDownLatch latch) {

List<TimerRequest> currList = new LinkedList<>();
int fileIndexPy = -1;
int msgIndex = 0;
for (TimerRequest tr : origin) {
tr.setLatch(latch);
if(origin.size() > 100){
if(fileIndexPy != tr.getOffsetPy() / commitLogFileSize){
if(currList.size() > 0){
if (origin.size() > 100) {
if (fileIndexPy != tr.getOffsetPy() / commitLogFileSize) {
if (currList.size() > 0) {
queue.add(currList);
currList = new LinkedList<>();
msgIndex = 0;
}
fileIndexPy = (int) (tr.getOffsetPy() / commitLogFileSize);
}
currList.add(tr);
if (++msgIndex % 2000 == 0 ) {
if (++msgIndex % 2000 == 0) {
queue.add(currList);
currList = new ArrayList<>();
}
Expand Down

0 comments on commit 0f74efa

Please sign in to comment.