@@ -912,10 +912,10 @@ index 0000000000000000000000000000000000000000..a630a84b60b4517e3bc330d4983b914b
912912+ }
913913diff --git a/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java b/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java
914914new file mode 100644
915- index 0000000000000000000000000000000000000000..97f2e433c483f1ebd7500ae142269e144ef5fda4
915+ index 0000000000000000000000000000000000000000..24fe40c14cc50f8357a9c7a7493140fdea016a3d
916916--- /dev/null
917917+++ b/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java
918- @@ -0,0 +1,277 @@
918+ @@ -0,0 +1,298 @@
919919+ package com.destroystokyo.paper.io;
920920+
921921+ import java.util.concurrent.ConcurrentLinkedQueue;
@@ -1022,6 +1022,27 @@ index 0000000000000000000000000000000000000000..97f2e433c483f1ebd7500ae142269e14
10221022+ }
10231023+
10241024+ /**
1025+ + * Polls the highest priority task currently available. {@code null} if none.
1026+ + */
1027+ + public T poll(final int lowestPriority) {
1028+ + T task;
1029+ + final int max = Math.min(LOWEST_PRIORITY, lowestPriority);
1030+ + for (int i = 0; i <= max; ++i) {
1031+ + final ConcurrentLinkedQueue<T> queue = this.queues[i];
1032+ +
1033+ + while ((task = queue.poll()) != null) {
1034+ + final int prevPriority = task.tryComplete(i);
1035+ + if (prevPriority != COMPLETING_PRIORITY && prevPriority <= i) {
1036+ + // if the prev priority was greater-than or equal to our current priority
1037+ + return task;
1038+ + }
1039+ + }
1040+ + }
1041+ +
1042+ + return null;
1043+ + }
1044+ +
1045+ + /**
10251046+ * Returns whether this queue may have tasks queued.
10261047+ * <p>
10271048+ * This operation is not atomic, but is MT-Safe.
@@ -1195,10 +1216,10 @@ index 0000000000000000000000000000000000000000..97f2e433c483f1ebd7500ae142269e14
11951216+ }
11961217diff --git a/src/main/java/com/destroystokyo/paper/io/QueueExecutorThread.java b/src/main/java/com/destroystokyo/paper/io/QueueExecutorThread.java
11971218new file mode 100644
1198- index 0000000000000000000000000000000000000000..ee906b594b306906c170180a29a8b61997d05168
1219+ index 0000000000000000000000000000000000000000..64b772dc1ed857ccd6999591f89dd89aface0649
11991220--- /dev/null
12001221+++ b/src/main/java/com/destroystokyo/paper/io/QueueExecutorThread.java
1201- @@ -0,0 +1,241 @@
1222+ @@ -0,0 +1,254 @@
12021223+ package com.destroystokyo.paper.io;
12031224+
12041225+ import net.minecraft.server.MinecraftServer;
@@ -1222,6 +1243,19 @@ index 0000000000000000000000000000000000000000..ee906b594b306906c170180a29a8b619
12221243+ protected volatile ConcurrentLinkedQueue<Thread> flushQueue = new ConcurrentLinkedQueue<>();
12231244+ protected volatile long flushCycles;
12241245+
1246+ + protected int lowestPriorityToPoll = PrioritizedTaskQueue.LOWEST_PRIORITY;
1247+ +
1248+ + public int getLowestPriorityToPoll() {
1249+ + return this.lowestPriorityToPoll;
1250+ + }
1251+ +
1252+ + public void setLowestPriorityToPoll(final int lowestPriorityToPoll) {
1253+ + if (this.isAlive()) {
1254+ + throw new IllegalStateException("Cannot set after starting");
1255+ + }
1256+ + this.lowestPriorityToPoll = lowestPriorityToPoll;
1257+ + }
1258+ +
12251259+ public QueueExecutorThread(final PrioritizedTaskQueue<T> queue) {
12261260+ this(queue, (int)(1.e6)); // 1.0ms
12271261+ }
@@ -1300,7 +1334,7 @@ index 0000000000000000000000000000000000000000..ee906b594b306906c170180a29a8b619
13001334+ Runnable task;
13011335+ boolean ret = false;
13021336+
1303- + while ((task = this.queue.poll()) != null) {
1337+ + while ((task = this.queue.poll(this.lowestPriorityToPoll )) != null) {
13041338+ ret = true;
13051339+ try {
13061340+ task.run();
@@ -1749,10 +1783,10 @@ index 0000000000000000000000000000000000000000..058fb5a41565e6ce2acbd1f4d071a1b8
17491783+ }
17501784diff --git a/src/main/java/com/destroystokyo/paper/io/chunk/ChunkTaskManager.java b/src/main/java/com/destroystokyo/paper/io/chunk/ChunkTaskManager.java
17511785new file mode 100644
1752- index 0000000000000000000000000000000000000000..18ae2e2b339d357fbe0f6f2b18bc14c0dfe4c222
1786+ index 0000000000000000000000000000000000000000..db6f8ac2ae068d5bc2ec2d587ab625c4956f6947
17531787--- /dev/null
17541788+++ b/src/main/java/com/destroystokyo/paper/io/chunk/ChunkTaskManager.java
1755- @@ -0,0 +1,513 @@
1789+ @@ -0,0 +1,505 @@
17561790+ package com.destroystokyo.paper.io.chunk;
17571791+
17581792+ import com.destroystokyo.paper.io.PaperFileIOThread;
@@ -1795,9 +1829,7 @@ index 0000000000000000000000000000000000000000..18ae2e2b339d357fbe0f6f2b18bc14c0
17951829+ private final PrioritizedTaskQueue<ChunkTask> chunkTasks = new PrioritizedTaskQueue<>(); // used if async chunks are disabled in config
17961830+
17971831+ protected static QueueExecutorThread<ChunkTask>[] globalWorkers;
1798- + protected static QueueExecutorThread<ChunkTask> globalUrgentWorker;
17991832+ protected static PrioritizedTaskQueue<ChunkTask> globalQueue;
1800- + protected static PrioritizedTaskQueue<ChunkTask> globalUrgentQueue;
18011833+
18021834+ protected static final ConcurrentLinkedQueue<Runnable> CHUNK_WAIT_QUEUE = new ConcurrentLinkedQueue<>();
18031835+
@@ -1891,12 +1923,12 @@ index 0000000000000000000000000000000000000000..18ae2e2b339d357fbe0f6f2b18bc14c0
18911923+ if (threads <= 0 || globalWorkers != null) {
18921924+ return;
18931925+ }
1926+ + ++threads; // add one for urgent executor
18941927+
18951928+ globalWorkers = new QueueExecutorThread[threads];
18961929+ globalQueue = new PrioritizedTaskQueue<>();
1897- + globalUrgentQueue = new PrioritizedTaskQueue<>();
18981930+
1899- + for (int i = 0; i < threads; ++i) {
1931+ + for (int i = 0; i < ( threads - 1) ; ++i) {
19001932+ globalWorkers[i] = new QueueExecutorThread<>(globalQueue, (long)0.10e6); //0.1ms
19011933+ globalWorkers[i].setName("Paper Async Chunk Task Thread #" + i);
19021934+ globalWorkers[i].setPriority(Thread.NORM_PRIORITY - 1);
@@ -1907,14 +1939,14 @@ index 0000000000000000000000000000000000000000..18ae2e2b339d357fbe0f6f2b18bc14c0
19071939+ globalWorkers[i].start();
19081940+ }
19091941+
1910- + globalUrgentWorker = new QueueExecutorThread<>(globalUrgentQueue , (long)0.10e6); //0.1ms
1911- + globalUrgentWorker .setName("Paper Async Chunk Urgent Task Thread");
1912- + globalUrgentWorker .setPriority(Thread.NORM_PRIORITY+1);
1913- + globalUrgentWorker .setUncaughtExceptionHandler((final Thread thread, final Throwable throwable) -> {
1942+ + globalWorkers[threads - 1] = new QueueExecutorThread<>(globalQueue , (long)0.10e6); //0.1ms
1943+ + globalWorkers[threads - 1] .setName("Paper Async Chunk Urgent Task Thread");
1944+ + globalWorkers[threads - 1] .setPriority(Thread.NORM_PRIORITY+1);
1945+ + globalWorkers[threads - 1] .setUncaughtExceptionHandler((final Thread thread, final Throwable throwable) -> {
19141946+ PaperFileIOThread.LOGGER.fatal("Thread '" + thread.getName() + "' threw an uncaught exception!", throwable);
19151947+ });
1916- +
1917- + globalUrgentWorker .start();
1948+ + globalWorkers[threads - 1].setLowestPriorityToPoll(PrioritizedTaskQueue.HIGHEST_PRIORITY);
1949+ + globalWorkers[threads - 1] .start();
19181950+ }
19191951+
19201952+ /**
@@ -2165,7 +2197,6 @@ index 0000000000000000000000000000000000000000..18ae2e2b339d357fbe0f6f2b18bc14c0
21652197+ worker.flush();
21662198+ }
21672199+ }
2168- + if (globalUrgentWorker != null) globalUrgentWorker.flush();
21692200+
21702201+ // flush again since tasks we execute async saves
21712202+ drainChunkWaitQueue();
@@ -2215,15 +2246,10 @@ index 0000000000000000000000000000000000000000..18ae2e2b339d357fbe0f6f2b18bc14c0
22152246+ if (task.isScheduled() && raised && this.workers != null) {
22162247+ // only notify if we're in queue to be executed
22172248+ if (priority == PrioritizedTaskQueue.HIGHEST_PRIORITY) {
2218- + // was in another queue but became urgent later, add to urgent queue and the previous
2219- + // queue will just have to ignore this task if it has already been started.
2220- + // Ultimately, we now have 2 potential queues that can pull it out whoever gets it first
2221- + // but the urgent queue has dedicated thread(s) so it's likely to win....
2222- + globalUrgentQueue.add(task);
2249+ + // notify urgent worker as well
22232250+ this.internalScheduleNotifyUrgent();
2224- + } else {
2225- + this.internalScheduleNotify();
22262251+ }
2252+ + this.internalScheduleNotify();
22272253+ }
22282254+ }
22292255+
@@ -2235,12 +2261,11 @@ index 0000000000000000000000000000000000000000..18ae2e2b339d357fbe0f6f2b18bc14c0
22352261+
22362262+ // It's important we order the task to be executed before notifying. Avoid a race condition where the worker thread
22372263+ // wakes up and goes to sleep before we actually schedule (or it's just about to sleep)
2264+ + this.queue.add(task);
2265+ + this.internalScheduleNotify();
22382266+ if (task.getPriority() == PrioritizedTaskQueue.HIGHEST_PRIORITY) {
2239- + globalUrgentQueue.add(task);
2267+ + // notify urgent too
22402268+ this.internalScheduleNotifyUrgent();
2241- + } else {
2242- + this.queue.add(task);
2243- + this.internalScheduleNotify();
22442269+ }
22452270+
22462271+ }
@@ -2249,7 +2274,8 @@ index 0000000000000000000000000000000000000000..18ae2e2b339d357fbe0f6f2b18bc14c0
22492274+ if (this.workers == null) {
22502275+ return;
22512276+ }
2252- + for (final QueueExecutorThread<ChunkTask> worker : this.workers) {
2277+ + for (int i = 0, len = this.workers.length - 1; i < len; ++i) {
2278+ + final QueueExecutorThread<ChunkTask> worker = this.workers[i];
22532279+ if (worker.notifyTasks()) {
22542280+ // break here since we only want to wake up one worker for scheduling one task
22552281+ break;
@@ -2259,10 +2285,10 @@ index 0000000000000000000000000000000000000000..18ae2e2b339d357fbe0f6f2b18bc14c0
22592285+
22602286+
22612287+ protected void internalScheduleNotifyUrgent() {
2262- + if (globalUrgentWorker == null) {
2288+ + if (this.workers == null) {
22632289+ return;
22642290+ }
2265- + globalUrgentWorker .notifyTasks();
2291+ + this.workers[this.workers.length - 1] .notifyTasks();
22662292+ }
22672293+
22682294+ }
@@ -2318,7 +2344,7 @@ index 45de5e508540b4ba622985d530f1aadaa7eb4535..5b8b9dabc6673b6f0a335a42d2ec71a5
23182344 ChunkHolder.FullChunkStatus playerchunk_state1 = ChunkHolder.getFullChunkStatus(this.ticketLevel);
23192345 // CraftBukkit start
23202346diff --git a/src/main/java/net/minecraft/server/level/ChunkMap.java b/src/main/java/net/minecraft/server/level/ChunkMap.java
2321- index a8c47535ec8c0cf992c40ec74a7a3a1f78da4865..87f055f8338d4ce2f9ff76bdc6c0b7ffc266ce78 100644
2347+ index bf80f2e299108d3de70354bf45fcd0efeff42ec7..8ccdaddcef1e5e83660e58075b039b124f36fce3 100644
23222348--- a/src/main/java/net/minecraft/server/level/ChunkMap.java
23232349+++ b/src/main/java/net/minecraft/server/level/ChunkMap.java
23242350@@ -466,6 +466,7 @@ public class ChunkMap extends ChunkStorage implements ChunkHolder.PlayerProvider
0 commit comments