Skip to content

Commit

Permalink
fix(notickvd): max concurrent chunk loads
Browse files Browse the repository at this point in the history
  • Loading branch information
ishland committed Jun 14, 2023
1 parent 31c44bc commit 8787314
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 33 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.ishland.c2me.base.common.structs;

import it.unimi.dsi.fastutil.objects.Reference2IntMap;
import it.unimi.dsi.fastutil.objects.Reference2IntOpenHashMap;
import it.unimi.dsi.fastutil.objects.ReferenceLinkedOpenHashSet;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import it.unimi.dsi.fastutil.objects.ObjectLinkedOpenHashSet;

/**
* A priority queue with fixed number of priorities and allows changing priorities of elements.
Expand All @@ -12,16 +12,16 @@
*/
public class DynamicPriorityQueue<E> {

private final ReferenceLinkedOpenHashSet<E>[] priorities;
private final Reference2IntMap<E> priorityMap = new Reference2IntOpenHashMap<>();
private final ObjectLinkedOpenHashSet<E>[] priorities;
private final Object2IntMap<E> priorityMap = new Object2IntOpenHashMap<>();

private int currentMinPriority = 0;

public DynamicPriorityQueue(int priorityCount) {
//noinspection unchecked
this.priorities = new ReferenceLinkedOpenHashSet[priorityCount];
this.priorities = new ObjectLinkedOpenHashSet[priorityCount];
for (int i = 0; i < priorityCount; i++) {
this.priorities[i] = new ReferenceLinkedOpenHashSet<>();
this.priorities[i] = new ObjectLinkedOpenHashSet<>();
}
}

Expand Down Expand Up @@ -54,7 +54,7 @@ public void changePriority(E element, int priority) {

public E dequeue() {
while (currentMinPriority < priorities.length) {
ReferenceLinkedOpenHashSet<E> priority = this.priorities[currentMinPriority];
ObjectLinkedOpenHashSet<E> priority = this.priorities[currentMinPriority];
if (priority.isEmpty()) {
currentMinPriority++;
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ public class NoTickSystem {
private final NormalTicketDistanceMap normalTicketDistanceMap;
private final ChunkTicketManager chunkTicketManager;

private final ConcurrentLinkedQueue<Runnable> pendingActions = new ConcurrentLinkedQueue<>();
private final ConcurrentLinkedQueue<Runnable> pendingActionsOnScheduler = new ConcurrentLinkedQueue<>();
final ConcurrentLinkedQueue<Runnable> mainBeforeTicketTicks = new ConcurrentLinkedQueue<>();
final ConcurrentLinkedQueue<Runnable> mainAfterTicketTicks = new ConcurrentLinkedQueue<>();

final NoThreadScheduler noThreadScheduler = new NoThreadScheduler();

Expand All @@ -38,29 +40,37 @@ public NoTickSystem(ChunkTicketManager chunkTicketManager) {
}

public void onTicketAdded(long position, ChunkTicket<?> ticket) {
this.pendingActions.add(() -> this.normalTicketDistanceMap.addTicket(position, ticket));
this.pendingActionsOnScheduler.add(() -> this.normalTicketDistanceMap.addTicket(position, ticket));
}

public void onTicketRemoved(long position, ChunkTicket<?> ticket) {
this.pendingActions.add(() -> this.normalTicketDistanceMap.removeTicket(position, ticket));
this.pendingActionsOnScheduler.add(() -> this.normalTicketDistanceMap.removeTicket(position, ticket));
}

public void addPlayerSource(ChunkPos chunkPos) {
this.pendingActions.add(() -> this.playerNoTickDistanceMap.addSource(chunkPos));
this.pendingActionsOnScheduler.add(() -> this.playerNoTickDistanceMap.addSource(chunkPos));
}

public void removePlayerSource(ChunkPos chunkPos) {
this.pendingActions.add(() -> this.playerNoTickDistanceMap.removeSource(chunkPos));
this.pendingActionsOnScheduler.add(() -> this.playerNoTickDistanceMap.removeSource(chunkPos));
}

public void setNoTickViewDistance(int viewDistance) {
this.pendingActions.add(() -> this.playerNoTickDistanceMap.setViewDistance(viewDistance));
this.pendingActionsOnScheduler.add(() -> this.playerNoTickDistanceMap.setViewDistance(viewDistance));
}

public void tickScheduler() {
this.noThreadScheduler.tick(Throwable::printStackTrace);
}

public void beforeTicketTicks() {
drainQueue(this.mainBeforeTicketTicks);
}

public void afterTicketTicks() {
drainQueue(this.mainAfterTicketTicks);
}

public void tick(ThreadedAnvilChunkStorage tacs) {
tickScheduler();
scheduleTick(tacs);
Expand All @@ -69,14 +79,7 @@ public void tick(ThreadedAnvilChunkStorage tacs) {
private void scheduleTick(ThreadedAnvilChunkStorage tacs) {
if (this.isTicking.compareAndSet(false, true))
executor.execute(() -> {
Runnable runnable;
while ((runnable = this.pendingActions.poll()) != null) {
try {
runnable.run();
} catch (Throwable t) {
t.printStackTrace();
}
}
drainQueue(this.pendingActionsOnScheduler);

boolean hasNoTickTicketUpdates;
if (pendingPurge) {
Expand Down Expand Up @@ -105,6 +108,17 @@ private void scheduleTick(ThreadedAnvilChunkStorage tacs) {
});
}

private void drainQueue(ConcurrentLinkedQueue<Runnable> queue) {
Runnable runnable;
while ((runnable = queue.poll()) != null) {
try {
runnable.run();
} catch (Throwable t) {
t.printStackTrace();
}
}
}

public void runPurge(long age) {
this.age = age;
this.pendingPurge = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import java.util.Comparator;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

public class PlayerNoTickDistanceMap extends ChunkPosDistanceLevelPropagator {

Expand Down Expand Up @@ -124,8 +123,8 @@ private boolean runPendingTicketUpdatesInternal(ThreadedAnvilChunkStorage tacs)
final ChunkPos pos = this.pendingTicketAdds.dequeue();
if (pos == null) break;
if (this.managedChunkTickets.add(pos.toLong())) {
this.addTicket0(pos);
this.chunkLoadFutures.add(getChunkLoadFuture(tacs, pos));
final CompletableFuture<Void> ticketFuture = this.addTicket0(pos);
this.chunkLoadFutures.add(getChunkLoadFuture(tacs, pos, ticketFuture));
hasUpdates = true;
}
}
Expand All @@ -135,23 +134,23 @@ private boolean runPendingTicketUpdatesInternal(ThreadedAnvilChunkStorage tacs)
}

private void removeTicket0(ChunkPos pos) {
this.noTickSystem.noThreadScheduler.execute(() -> this.chunkTicketManager.removeTicketWithLevel(TICKET_TYPE, pos, 33, pos));
this.noTickSystem.mainBeforeTicketTicks.add(() -> this.chunkTicketManager.removeTicketWithLevel(TICKET_TYPE, pos, 33, pos));
}

private void addTicket0(ChunkPos pos) {
this.noTickSystem.noThreadScheduler.execute(() -> this.chunkTicketManager.addTicketWithLevel(TICKET_TYPE, pos, 33, pos));
private CompletableFuture<Void> addTicket0(ChunkPos pos) {
return CompletableFuture.runAsync(() -> this.chunkTicketManager.addTicketWithLevel(TICKET_TYPE, pos, 33, pos), this.noTickSystem.mainBeforeTicketTicks::add);
}

private CompletableFuture<Void> getChunkLoadFuture(ThreadedAnvilChunkStorage tacs, ChunkPos pos) {
final CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
private CompletableFuture<Void> getChunkLoadFuture(ThreadedAnvilChunkStorage tacs, ChunkPos pos, CompletableFuture<Void> ticketFuture) {
final CompletableFuture<Void> future = ticketFuture.thenComposeAsync(unused -> {
final ChunkHolder holder = ((IThreadedAnvilChunkStorage) tacs).getCurrentChunkHolders().get(pos.toLong());
if (holder == null) {
return CompletableFuture.completedFuture((Void) null);
return CompletableFuture.completedFuture(null);
} else {
return holder.getAccessibleFuture().exceptionally(unused -> null).thenAccept(unused -> {
return holder.getAccessibleFuture().exceptionally(unused1 -> null).thenAccept(unused1 -> {
});
}
}, this.noTickSystem.noThreadScheduler).thenCompose(Function.identity());
}, this.noTickSystem.mainAfterTicketTicks::add);
future.thenRunAsync(() -> {
this.chunkLoadFutures.remove(future);
final boolean hasUpdates = this.runPendingTicketUpdatesInternal(tacs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,18 @@ private void onPurge(CallbackInfo ci) {
this.noTickSystem.runPurge(this.age);
}

@Inject(method = "tick", at = @At("HEAD"))
private void beforeTick(ThreadedAnvilChunkStorage chunkStorage, CallbackInfoReturnable<Boolean> cir) {
this.noTickSystem.beforeTicketTicks();
}

@Inject(method = "tick", at = @At("RETURN"))
private void onTick(ThreadedAnvilChunkStorage chunkStorage, CallbackInfoReturnable<Boolean> cir) {
if (this.simulationDistanceTracker instanceof NoOPTickingMap map) {
map.setTACS(chunkStorage);
}
this.noTickSystem.tickScheduler();
this.noTickSystem.afterTicketTicks();
if (this.lastNoTickSystemTick != this.age) {
this.noTickSystem.tick(chunkStorage);
this.lastNoTickSystemTick = this.age;
Expand Down

0 comments on commit 8787314

Please sign in to comment.