Skip to content

Commit

Permalink
refactor(util): way better EventQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
iGoodie committed Jun 19, 2020
1 parent 97ca396 commit f4cf795
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 85 deletions.
Expand Up @@ -27,7 +27,7 @@
import net.programmer.igoodie.twitchspawn.tslanguage.parser.TSLParser;
import net.programmer.igoodie.twitchspawn.tslanguage.parser.TSLSyntaxError;
import net.programmer.igoodie.twitchspawn.tslanguage.parser.TSLTokenizer;
import net.programmer.igoodie.twitchspawn.util.TimeTaskQueue;
import net.programmer.igoodie.twitchspawn.util.EventQueue;

import java.util.Collection;
import java.util.Iterator;
Expand Down Expand Up @@ -313,7 +313,7 @@ public static int testModule(CommandContext<CommandSource> context, String strea

ServerPlayerEntity streamerPlayer = context.getSource().asPlayer();
TSLRuleset ruleset = ConfigManager.RULESET_COLLECTION.getRuleset(streamerNick);
TimeTaskQueue queue = ConfigManager.RULESET_COLLECTION.getQueue(streamerNick);
EventQueue queue = ConfigManager.RULESET_COLLECTION.getQueue(streamerNick);

Collection<TSLEvent> events = ruleset.getEvents();
Iterator<TSLEvent> eventIterator = events.iterator();
Expand Down
Expand Up @@ -6,7 +6,7 @@
import net.programmer.igoodie.twitchspawn.tslanguage.event.TSLEventPair;
import net.programmer.igoodie.twitchspawn.tslanguage.keyword.TSLEventKeyword;
import net.programmer.igoodie.twitchspawn.util.CooldownBucket;
import net.programmer.igoodie.twitchspawn.util.TimeTaskQueue;
import net.programmer.igoodie.twitchspawn.util.EventQueue;

import java.util.HashMap;
import java.util.List;
Expand All @@ -17,7 +17,7 @@ public class TSLRulesetCollection {

private TSLRuleset defaultRuleset;
private Map<String, TSLRuleset> streamerRulesets; // Maps lowercase nicks to TSLTree
private Map<String, TimeTaskQueue> eventQueues; // Maps lowercase nicks to TimeTaskQueue
private Map<String, EventQueue> eventQueues; // Maps lowercase nicks to TimeTaskQueue

public TSLRulesetCollection(TSLRuleset defaultTree, List<TSLRuleset> streamerTrees) {
if (defaultTree == null)
Expand Down Expand Up @@ -68,13 +68,8 @@ public boolean handleEvent(EventArguments args, CooldownBucket cooldownBucket) {
}

// Queue incoming event arguments
TimeTaskQueue streamerQueue = getQueue(args.streamerNickname);
streamerQueue.queue(() -> TwitchSpawn.SERVER.execute(() -> {
boolean performed = eventNode.process(args);
if (performed && cooldownBucket != null) {
cooldownBucket.consume(args.actorNickname);
}
}));
EventQueue eventQueue = getQueue(args.streamerNickname);
eventQueue.queue(eventNode, args, cooldownBucket);
TwitchSpawn.LOGGER.info("Queued handler for {} event.", eventKeyword);
return true;
}
Expand All @@ -97,19 +92,19 @@ public TSLRuleset getRuleset(String streamerNick) {
return streamerRulesets.get(streamerNick.toLowerCase());
}

public TimeTaskQueue getQueue(String streamerNick) {
TimeTaskQueue queue = eventQueues.get(streamerNick.toLowerCase());
public EventQueue getQueue(String streamerNick) {
EventQueue queue = eventQueues.get(streamerNick.toLowerCase());

if (queue == null) { // Lazy init
queue = new TimeTaskQueue(ConfigManager.PREFERENCES.notificationDelay);
queue = new EventQueue(ConfigManager.PREFERENCES.notificationDelay);
eventQueues.put(streamerNick.toLowerCase(), queue);
}

return queue;
}

public void clearQueue() {
eventQueues.values().forEach(TimeTaskQueue::clearAll);
eventQueues.values().forEach(EventQueue::reset);
}

}
121 changes: 121 additions & 0 deletions src/main/java/net/programmer/igoodie/twitchspawn/util/EventQueue.java
@@ -0,0 +1,121 @@
package net.programmer.igoodie.twitchspawn.util;

import net.programmer.igoodie.twitchspawn.TwitchSpawn;
import net.programmer.igoodie.twitchspawn.tslanguage.EventArguments;
import net.programmer.igoodie.twitchspawn.tslanguage.event.TSLEvent;

import java.util.LinkedList;
import java.util.Queue;
import java.util.Timer;
import java.util.TimerTask;

public class EventQueue {

private Timer timer;
private volatile boolean timerTicking;
private volatile Queue<TimerTask> tasks;
private long cooldown; // milliseconds
private long frozenUntil; // ms timestamp
private int succeededEvents;
private int discardedEvents;

public EventQueue(long cooldownDuration) {
this.timer = new Timer();
this.tasks = new LinkedList<>();
this.cooldown = cooldownDuration;
this.frozenUntil = -1;
}

public void queue(Runnable task) {
tasks.add(new TimerTask() {
@Override
public void run() {
long now = System.currentTimeMillis();

task.run();

frozenUntil = now + cooldown;
}
});
updateTimer();
}

public void queue(TSLEvent eventNode, EventArguments args, CooldownBucket cooldownBucket) {
tasks.add(new TimerTask() {
@Override
public void run() {
TwitchSpawn.SERVER.execute(() -> {
long now = System.currentTimeMillis();
boolean performed = eventNode.process(args);

if (!performed) {
discardedEvents++;
return;
}

if (cooldownBucket != null)
cooldownBucket.consume(args.actorNickname);

frozenUntil = now + cooldown;
succeededEvents++;
});
}
});
updateTimer();
}

private synchronized void updateTimer() {
if (!timerTicking) {
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
if (hasUnhandledEvent()) {
proceed();

} else {
this.cancel(); // XXX: Is reliable?
timerTicking = false;
}
}
}, 10, 10); // per 10ms
timerTicking = true;
}
}

public synchronized int succeededEventCount() {
return succeededEvents;
}

public synchronized int discardedEventCount() {
return discardedEvents;
}

public synchronized int unhandledEventCount() {
return tasks.size();
}

public synchronized boolean hasUnhandledEvent() {
return !tasks.isEmpty();
}

public boolean proceed() {
long now = System.currentTimeMillis();
if (now < frozenUntil) return false;
if (tasks.isEmpty()) return false;

TimerTask task = tasks.remove();
task.run();
return true;
}

public void reset() {
timer.cancel();
timer.purge();
timerTicking = false;
tasks.clear();
this.frozenUntil = -1;
succeededEvents = 0;
discardedEvents = 0;
}

}

This file was deleted.

@@ -1,6 +1,6 @@
package net.programmer.igoodie.twitchspawn;

import net.programmer.igoodie.twitchspawn.util.TimeTaskQueue;
import net.programmer.igoodie.twitchspawn.util.EventQueue;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
Expand All @@ -13,7 +13,7 @@ public class TimingTests {
@DisplayName("should preserve queue and approximate timing")
public void timerQueueTest() throws InterruptedException {
long duration = 1 * 1000;
TimeTaskQueue queue = new TimeTaskQueue(duration);
EventQueue queue = new EventQueue(duration);
List<Integer> list = new LinkedList<>();

// Fill queue
Expand All @@ -26,7 +26,7 @@ public void timerQueueTest() throws InterruptedException {
// Wait for timer to consume task
for (int i = 5; i >= 0; i--) {
System.out.println("Waiting for #" + (5 - i + 1));
Assertions.assertEquals(i, queue.size());
Assertions.assertEquals(i, queue.unhandledEventCount());
if (i != 0) Thread.sleep(duration);
}

Expand Down

0 comments on commit f4cf795

Please sign in to comment.