Skip to content

Commit

Permalink
feat(tsl): add WAIT action
Browse files Browse the repository at this point in the history
- Reimplemented the event queue
- Added WAIT action
  • Loading branch information
iGoodie committed Sep 13, 2020
1 parent ccb0520 commit 08fddf8
Show file tree
Hide file tree
Showing 10 changed files with 351 additions and 161 deletions.
Expand Up @@ -27,7 +27,6 @@
import net.programmer.igoodie.twitchspawn.tslanguage.parser.TSLParser;
import net.programmer.igoodie.twitchspawn.tslanguage.parser.TSLSyntaxError;
import net.programmer.igoodie.twitchspawn.tslanguage.parser.TSLTokenizer;
import net.programmer.igoodie.twitchspawn.util.EventQueue;

import java.util.Collection;
import java.util.Iterator;
Expand Down
@@ -0,0 +1,190 @@
package net.programmer.igoodie.twitchspawn.eventqueue;

import net.minecraft.entity.player.ServerPlayerEntity;
import net.minecraftforge.fml.network.NetworkDirection;
import net.programmer.igoodie.twitchspawn.TwitchSpawn;
import net.programmer.igoodie.twitchspawn.network.NetworkManager;
import net.programmer.igoodie.twitchspawn.network.packet.GlobalChatCooldownPacket;
import net.programmer.igoodie.twitchspawn.tslanguage.EventArguments;
import net.programmer.igoodie.twitchspawn.tslanguage.event.TSLEvent;
import net.programmer.igoodie.twitchspawn.util.CooldownBucket;

import java.util.Deque;
import java.util.LinkedList;
import java.util.List;

public class EventQueue {

private final Thread innerThread;
private volatile EventQueueState state;
private volatile Deque<EventQueueTask> tasks;
private volatile boolean waitingForServer;
private long cooldown; // milliseconds
private int succeededEvents;
private int discardedEvents;

public EventQueue(long cooldownDuration) {
this.innerThread = new Thread(() -> {
while (true) stepThread();
}, "TwitchSpawn Event Queue");
this.state = EventQueueState.PAUSED;
this.tasks = new LinkedList<>();
this.cooldown = cooldownDuration;

this.innerThread.start();
}

private void stepThread() {
try {
if (hasUnhandledEvent()) {
if (waitingForServer) {
// TODO: Sleep?
return;
}

EventQueueTask task = tasks.remove();

if (task.getType() == EventQueueTask.Type.SLEEP)
state = EventQueueState.COOLDOWN;

task.run();

if (task.getType() == EventQueueTask.Type.SLEEP)
state = EventQueueState.WORKING;

} else {
pause();
}

} catch (Throwable e) {
discardedEvents++;
e.printStackTrace(); // TODO:
}
}

private void unpause() {
synchronized (innerThread) {
state = EventQueueState.WORKING;
innerThread.notifyAll();
}
}

private void pause() {
synchronized (innerThread) {
try {
state = EventQueueState.PAUSED;
innerThread.wait();

} catch (InterruptedException e) {
e.printStackTrace(); // TODO
}
}
}

public void updateThread() {
if (state == EventQueueState.PAUSED) unpause();
}

public void cancelUpcomingSleep() {
assert this.tasks instanceof LinkedList;
List<EventQueueTask> tasks = (LinkedList<EventQueueTask>) this.tasks;
for (int i = 0; i < tasks.size(); i++) {
EventQueueTask task = tasks.get(i);
if (task.getType() == EventQueueTask.Type.SLEEP) {
tasks.remove(i);
return;
}
}
}

public void queueSleepFirst() {
queueSleepFirst(cooldown);
}

public void queueSleepFirst(long millis) {
tasks.addFirst(new EventQueueTask("Sleep", millis));
}

public void queueSleep() {
queueSleep(cooldown);
}

public void queueSleep(long millis) {
tasks.add(new EventQueueTask("Sleep", millis));
}

public void queueFirst(String name, Runnable task) {
tasks.addFirst(new EventQueueTask(name, task));

}

public void queue(Runnable task) {
queue("Runnable task", task);
}

public void queue(String name, Runnable task) {
tasks.add(new EventQueueTask(name, task));
}

public void queue(TSLEvent eventNode, EventArguments args, CooldownBucket cooldownBucket) {
if (eventNode.willPerform(args)) {
if (cooldownBucket != null) {
cooldownBucket.consume(args.actorNickname);

ServerPlayerEntity playerEntity = TwitchSpawn.SERVER
.getPlayerList()
.getPlayerByUsername(args.streamerNickname);

if (playerEntity != null) {
NetworkManager.CHANNEL.sendTo(
new GlobalChatCooldownPacket(cooldownBucket.getGlobalCooldownTimestamp()),
playerEntity.connection.netManager,
NetworkDirection.PLAY_TO_CLIENT
);
}
}
}

tasks.add(new EventQueueTask("TSL Event task", () -> {
waitingForServer = true;
TwitchSpawn.SERVER.execute(() -> {
try {
boolean performed = eventNode.process(args);

if (performed) succeededEvents++;
else discardedEvents++;

} catch (Throwable e) {
discardedEvents++;
// TODO: "Event failed HUD" maybe?

} finally {
waitingForServer = false;
}
});
}));
}

public synchronized int succeededEventCount() {
return succeededEvents;
}

public synchronized int discardedEventCount() {
return discardedEvents;
}

public synchronized int unhandledEventCount() {
return tasks.size();
}

public synchronized boolean hasUnhandledEvent() {
return !tasks.isEmpty();
}

public void reset() {
tasks.clear();
succeededEvents = 0;
discardedEvents = 0;
}

}
@@ -0,0 +1,9 @@
package net.programmer.igoodie.twitchspawn.eventqueue;

public enum EventQueueState {

WORKING,
COOLDOWN,
PAUSED

}
@@ -0,0 +1,77 @@
package net.programmer.igoodie.twitchspawn.eventqueue;

import java.util.Timer;
import java.util.TimerTask;

public class EventQueueTask {

final String name;
Runnable routine;
long cooldown;

public EventQueueTask(Runnable routine) {
this("Runnable task", routine);
}

public EventQueueTask(String name, Runnable routine) {
this.name = name;
this.routine = routine;
}

public EventQueueTask(long cooldown) {
this("Cooldown task", cooldown);
}

public EventQueueTask(String name, long cooldown) {
this.name = name;
this.cooldown = cooldown;
}

private void sleep(long millis) throws InterruptedException {
final Thread currentThread = Thread.currentThread();
synchronized (currentThread) {
new Timer().schedule(new TimerTask() {
@Override
public void run() {
synchronized (currentThread) {
currentThread.notifyAll();
}
}
}, cooldown);

currentThread.wait();
}
}

public void run() throws InterruptedException {
switch (getType()) {
case SLEEP:
sleep(cooldown);
return;

case ROUTINE:
routine.run();
return;

default:
System.out.println("Wut?");
}
}

public Type getType() {
if (routine != null) return Type.ROUTINE;
return Type.SLEEP;
}

@Override
public String toString() {
return name + (cooldown == 0 ? "" : String.format("(%d)", cooldown));
}

/* --------------------------------- */

public enum Type {
SLEEP, ROUTINE
}

}
Expand Up @@ -2,11 +2,11 @@

import net.programmer.igoodie.twitchspawn.TwitchSpawn;
import net.programmer.igoodie.twitchspawn.configuration.ConfigManager;
import net.programmer.igoodie.twitchspawn.eventqueue.EventQueue;
import net.programmer.igoodie.twitchspawn.tslanguage.event.TSLEvent;
import net.programmer.igoodie.twitchspawn.tslanguage.event.TSLEventPair;
import net.programmer.igoodie.twitchspawn.tslanguage.keyword.TSLEventKeyword;
import net.programmer.igoodie.twitchspawn.util.CooldownBucket;
import net.programmer.igoodie.twitchspawn.util.EventQueue;

import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -70,6 +70,8 @@ public boolean handleEvent(EventArguments args, CooldownBucket cooldownBucket) {
// Queue incoming event arguments
EventQueue eventQueue = getQueue(args.streamerNickname);
eventQueue.queue(eventNode, args, cooldownBucket);
eventQueue.queueSleep();
eventQueue.updateThread();
TwitchSpawn.LOGGER.info("Queued handler for {} event.", eventKeyword);
return true;
}
Expand Down
Expand Up @@ -3,6 +3,7 @@
import com.google.gson.JsonArray;
import net.minecraft.entity.player.ServerPlayerEntity;
import net.programmer.igoodie.twitchspawn.configuration.ConfigManager;
import net.programmer.igoodie.twitchspawn.eventqueue.EventQueue;
import net.programmer.igoodie.twitchspawn.tslanguage.EventArguments;
import net.programmer.igoodie.twitchspawn.tslanguage.keyword.TSLActionKeyword;
import net.programmer.igoodie.twitchspawn.tslanguage.parser.TSLParser;
Expand Down Expand Up @@ -94,16 +95,25 @@ private void parseSingleAction(String actionName, List<String> actionArgs) throw
protected void performAction(ServerPlayerEntity player, EventArguments args) {
this.actions.forEach(action -> action.reflectedUser = this.reflectedUser);

EventQueue eventQueue = ConfigManager.RULESET_COLLECTION.getQueue(args.streamerNickname);

if (instant) { // Perform them all instantly
this.actions.forEach(action -> action.performAction(player, args));
for (int i = this.actions.size() - 1; i >= 0; i--) {
TSLAction action = this.actions.get(i);
eventQueue.queueFirst(action.getClass().getSimpleName(),
() -> action.performAction(player, args));
}

} else {
this.actions.get(0).process(args); // Perform first one immediately
this.actions.subList(1, this.actions.size()) // Queue rest of it
.forEach(action -> ConfigManager.RULESET_COLLECTION
.getQueue(args.streamerNickname)
.queue(() -> action.process(args)));
for (int i = this.actions.size() - 1; i >= 0; i--) {
TSLAction action = this.actions.get(i);
eventQueue.queueFirst(action.getClass().getSimpleName(),
() -> action.process(args));
if (i != 0) eventQueue.queueSleepFirst(ConfigManager.PREFERENCES.notificationDelay);
}
}

eventQueue.updateThread();
}

}

0 comments on commit 08fddf8

Please sign in to comment.