Skip to content

Commit

Permalink
process event skip issue
Browse files Browse the repository at this point in the history
  • Loading branch information
zhenghao committed Jul 6, 2019
1 parent a1f7933 commit 93ea15c
Show file tree
Hide file tree
Showing 5 changed files with 520 additions and 514 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>org.menina</groupId>
<artifactId>raft</artifactId>
<version>1.2.3</version>
<version>1.2.4</version>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/org/menina/raft/common/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,6 @@ public enum StorageType {

public static final String DEFAULT_SCHEDULE_BACKGROUND_THREAD = "schedule-background-thread";

public static final String DEFAULT_TICK_EVENT_LOOP_THREAD = "tick-event-loop-thread";

}
5 changes: 3 additions & 2 deletions src/main/java/org/menina/raft/core/RaftNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.menina.rail.config.ServerOptions;
import org.menina.rail.server.ExporterServer;
import lombok.extern.slf4j.Slf4j;
import org.menina.raft.api.State;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -50,7 +51,7 @@ private void backend() {
this.snapshotter.recover();
RaftProto.Snapshot snapshot = this.snapshotter.loadNewest();
this.nextOffsetMetaData = this.wal.recoverWal();
this.nodeInfo().setReplayState(ReplayState.REPLAYING);
this.nodeInfo().setReplayState(State.ReplayState.REPLAYING);
if (snapshot != null) {
this.raftLog.updateSnapshotMetadata(snapshot.getMeta());
this.raftLog.appliedTo(snapshot.getMeta().getIndex());
Expand All @@ -72,7 +73,7 @@ private void backend() {
this.recover(snapshot, latest);
this.storage.recover();
if (latest == null || (snapshot != null && latest.getIndex() <= snapshot.getMeta().getIndex())) {
nodeInfo().setReplayState(ReplayState.REPLAYED);
nodeInfo().setReplayState(State.ReplayState.REPLAYED);
log.info("state machine replay success, replay state {}", nodeInfo().getReplayState());
}
} catch (IOException e) {
Expand Down
63 changes: 33 additions & 30 deletions src/main/java/org/menina/raft/election/LogicalClock.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package org.menina.raft.election;

import com.google.common.base.Preconditions;
import org.menina.raft.common.Constants;
import org.menina.raft.common.RaftThread;
import org.menina.raft.core.loop.EventLoop;
import org.menina.rail.common.NamedThreadFactory;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
Expand All @@ -32,14 +32,6 @@ public class LogicalClock implements Tick {
private ConcurrentMap<String, TickListener> tickListeners = new ConcurrentHashMap<>();
private LinkedBlockingQueue<TickEvent> eventsQueue = new LinkedBlockingQueue<TickEvent>(1);
private ScheduledExecutorService ticker = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("tick-schedule-thread"));
private ExecutorService executor = new ThreadPoolExecutor(
1,
2,
30L,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
new NamedThreadFactory("tick-event-listener-thread"));


public LogicalClock(long accuracy) {
this.accuracy = accuracy;
Expand All @@ -55,25 +47,7 @@ public void start() {
if (running.compareAndSet(false, true)) {
log.info("Global clock running");
ticker.scheduleAtFixedRate(this, accuracy, accuracy, TimeUnit.MILLISECONDS);
executor.execute(new Runnable() {
@Override
public void run() {
while (running.get()) {
tickListeners.values().iterator().forEachRemaining(new Consumer<TickListener>() {
@Override
public void accept(TickListener listener) {
try {
listener.onTick(eventsQueue.take());
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
});
}

log.warn("tick-event-listener-thread shut down");
}
});
RaftThread.daemon(new TickEventLoop(), Constants.DEFAULT_TICK_EVENT_LOOP_THREAD).start();
} else {
log.warn("Tick has started");
}
Expand All @@ -97,4 +71,33 @@ public void run() {
log.warn("Miss trigger tick event, make sure non-blocking");
}
}

private class TickEventLoop implements EventLoop {

private boolean running = true;

@Override
public void close() {
this.running = false;
}

@Override
public void run() {
while (running) {
try {
TickEvent event = eventsQueue.take();
tickListeners.values().iterator().forEachRemaining(new Consumer<TickListener>() {
@Override
public void accept(TickListener listener) {
listener.onTick(event);
}
});
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}

log.warn("{} shut down", Constants.DEFAULT_TICK_EVENT_LOOP_THREAD);
}
}
}
Loading

0 comments on commit 93ea15c

Please sign in to comment.