diff --git a/common/src/main/java/org/mvndaemon/mvnd/common/DaemonRegistry.java b/common/src/main/java/org/mvndaemon/mvnd/common/DaemonRegistry.java index 0ffd01ac6..84c460239 100644 --- a/common/src/main/java/org/mvndaemon/mvnd/common/DaemonRegistry.java +++ b/common/src/main/java/org/mvndaemon/mvnd/common/DaemonRegistry.java @@ -19,9 +19,9 @@ import java.io.File; import java.io.IOException; import java.lang.management.ManagementFactory; -import java.lang.reflect.Field; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; @@ -32,14 +32,10 @@ import java.util.List; import java.util.Map; import java.util.Random; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import sun.misc.Unsafe; -import sun.nio.ch.DirectBuffer; import static org.mvndaemon.mvnd.common.DaemonState.Canceled; import static org.mvndaemon.mvnd.common.DaemonState.Idle; @@ -56,25 +52,27 @@ public class DaemonRegistry implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(DaemonRegistry.class); private static final int MAX_LENGTH = 32768; + private static final long LOCK_TIMEOUT_MS = 1000 * 20; private final Path registryFile; - - private final Lock lock = new ReentrantLock(); + private static final Map locks = new ConcurrentHashMap<>(); + private final Object lck; private final FileChannel channel; private final MappedByteBuffer buffer; - private long seq; private final Map infosMap = new HashMap<>(); private final List stopEvents = new ArrayList<>(); public DaemonRegistry(Path registryFile) { - this.registryFile = registryFile; + final Path absPath = registryFile.toAbsolutePath().normalize(); + this.lck = locks.computeIfAbsent(absPath, p -> new Object()); + this.registryFile = absPath; try { - if (!Files.isRegularFile(registryFile)) { - if (!Files.isDirectory(registryFile.getParent())) { - Files.createDirectories(registryFile.getParent()); + if (!Files.isRegularFile(absPath)) { + if (!Files.isDirectory(absPath.getParent())) { + Files.createDirectories(absPath.getParent()); } } - channel = FileChannel.open(registryFile, + channel = FileChannel.open(absPath, StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE); buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, MAX_LENGTH); } catch (IOException e) { @@ -95,125 +93,67 @@ public Path getRegistryFile() { } public DaemonInfo get(String uid) { - lock.lock(); - try { - read(); - return infosMap.get(uid); - } finally { - lock.unlock(); - } + read(); + return infosMap.get(uid); } public List getAll() { - lock.lock(); - try { - read(); - return new ArrayList<>(infosMap.values()); - } finally { - lock.unlock(); - } + read(); + return new ArrayList<>(infosMap.values()); } public List getIdle() { - lock.lock(); - try { - read(); - return infosMap.values().stream() - .filter(di -> di.getState() == Idle) - .collect(Collectors.toList()); - } finally { - lock.unlock(); - } + read(); + return infosMap.values().stream() + .filter(di -> di.getState() == Idle) + .collect(Collectors.toList()); } public List getNotIdle() { - lock.lock(); - try { - read(); - return infosMap.values().stream() - .filter(di -> di.getState() != Idle) - .collect(Collectors.toList()); - } finally { - lock.unlock(); - } + return infosMap.values().stream() + .filter(di -> di.getState() != Idle) + .collect(Collectors.toList()); } public List getCanceled() { - lock.lock(); - try { - read(); - return infosMap.values().stream() - .filter(di -> di.getState() == Canceled) - .collect(Collectors.toList()); - } finally { - lock.unlock(); - } + read(); + return infosMap.values().stream() + .filter(di -> di.getState() == Canceled) + .collect(Collectors.toList()); } public void remove(final String uid) { - lock.lock(); - LOGGER.debug("Removing daemon uid: {}", uid); - try { - update(() -> infosMap.remove(uid)); - } finally { - lock.unlock(); - } + update(() -> infosMap.remove(uid)); } public void markState(final String uid, final DaemonState state) { - lock.lock(); LOGGER.debug("Marking busy by uid: {}", uid); - try { - update(() -> infosMap.computeIfPresent(uid, (id, di) -> di.withState(state))); - } finally { - lock.unlock(); - } + update(() -> infosMap.computeIfPresent(uid, (id, di) -> di.withState(state))); } public void storeStopEvent(final DaemonStopEvent stopEvent) { - lock.lock(); LOGGER.debug("Storing daemon stop event with timestamp {}", stopEvent.getTimestamp()); - try { - update(() -> stopEvents.add(stopEvent)); - } finally { - lock.unlock(); - } + update(() -> stopEvents.add(stopEvent)); } public List getStopEvents() { - lock.lock(); - LOGGER.debug("Getting daemon stop events"); - try { - read(); - return new ArrayList<>(stopEvents); - } finally { - lock.unlock(); - } + read(); + return new ArrayList<>(stopEvents); } public void removeStopEvents(final Collection events) { - lock.lock(); LOGGER.debug("Removing {} daemon stop events from registry", events.size()); - try { - update(() -> stopEvents.removeAll(events)); - } finally { - lock.unlock(); - } + update(() -> stopEvents.removeAll(events)); } public void store(final DaemonInfo info) { - lock.lock(); LOGGER.debug("Storing daemon {}", info); - try { - update(() -> infosMap.put(info.getUid(), info)); - } finally { - lock.unlock(); - } + update(() -> infosMap.put(info.getUid(), info)); } - private static final long OFFSET_LOCK = 0; - private static final long OFFSET_SEQ = OFFSET_LOCK + Long.BYTES; - private static final long OFFSET_DATA = OFFSET_SEQ + Long.BYTES; + public static int getProcessId() { + return PROCESS_ID; + } private void read() { doUpdate(null); @@ -227,13 +167,12 @@ private void doUpdate(Runnable updater) { if (!Files.isReadable(registryFile)) { throw new DaemonException("Registry became unaccessible"); } - try { - busyLockLong(OFFSET_LOCK); - try { - long newSeq = readLong(OFFSET_SEQ); - if (newSeq != seq) { - seq = newSeq; - BufferCaster.cast(buffer).position((int) OFFSET_DATA); + + synchronized (lck) { + final long deadline = System.currentTimeMillis() + LOCK_TIMEOUT_MS; + while (System.currentTimeMillis() < deadline) { + try (FileLock l = channel.tryLock(0, MAX_LENGTH, false)) { + BufferCaster.cast(buffer).position(0); infosMap.clear(); int nb = buffer.getInt(); for (int i = 0; i < nb; i++) { @@ -266,40 +205,40 @@ private void doUpdate(Runnable updater) { DaemonStopEvent se = new DaemonStopEvent(uid, date, des, reason); stopEvents.add(se); } - } - if (updater != null) { - updater.run(); - writeLong(OFFSET_SEQ, ++seq); - BufferCaster.cast(buffer).position((int) OFFSET_DATA); - buffer.putInt(infosMap.size()); - for (DaemonInfo di : infosMap.values()) { - writeString(di.getUid()); - writeString(di.getJavaHome()); - writeString(di.getMvndHome()); - buffer.putInt(di.getPid()); - buffer.putInt(di.getAddress()); - writeString(di.getLocale()); - buffer.putInt(di.getOptions().size()); - for (String opt : di.getOptions()) { - writeString(opt); + + if (updater != null) { + updater.run(); + BufferCaster.cast(buffer).position((int) 0); + buffer.putInt(infosMap.size()); + for (DaemonInfo di : infosMap.values()) { + writeString(di.getUid()); + writeString(di.getJavaHome()); + writeString(di.getMvndHome()); + buffer.putInt(di.getPid()); + buffer.putInt(di.getAddress()); + writeString(di.getLocale()); + buffer.putInt(di.getOptions().size()); + for (String opt : di.getOptions()) { + writeString(opt); + } + buffer.put((byte) di.getState().ordinal()); + buffer.putLong(di.getLastIdle()); + buffer.putLong(di.getLastBusy()); + } + buffer.putInt(stopEvents.size()); + for (DaemonStopEvent dse : stopEvents) { + writeString(dse.getUid()); + buffer.putLong(dse.getTimestamp()); + buffer.put((byte) (dse.getStatus() == null ? -1 : dse.getStatus().ordinal())); + writeString(dse.getReason()); } - buffer.put((byte) di.getState().ordinal()); - buffer.putLong(di.getLastIdle()); - buffer.putLong(di.getLastBusy()); - } - buffer.putInt(stopEvents.size()); - for (DaemonStopEvent dse : stopEvents) { - writeString(dse.getUid()); - buffer.putLong(dse.getTimestamp()); - buffer.put((byte) (dse.getStatus() == null ? -1 : dse.getStatus().ordinal())); - writeString(dse.getReason()); } + return; + } catch (IOException e) { + throw new RuntimeException("Could not lock offset 0 of " + registryFile); } - } finally { - unlockLong(OFFSET_LOCK); } - } catch (InterruptedException e) { - throw new RuntimeException(e); + throw new RuntimeException("Could not lock " + registryFile + " within " + LOCK_TIMEOUT_MS + " ms"); } } @@ -326,187 +265,6 @@ private static int getProcessId0() { } } - private static long uniqueTid() { - // Assume 48 bit for 16 to 24-bit process id and 16 million threads from the start. - return ((long) getProcessId() << 24) | currentThread().getId(); - } - - public static int getProcessId() { - return PROCESS_ID; - } - - private static Thread currentThread() { - return Thread.currentThread(); - } - - static final int SLEEP_THRESHOLD = 20 * 1000 * 1000; - static final long BUSY_LOCK_LIMIT = 20L * 1000 * 1000 * 1000; - - public void busyLockLong(long offset) throws InterruptedException, IllegalStateException { - boolean success = tryLockNanosLong(offset, BUSY_LOCK_LIMIT); - if (!success) - if (currentThread().isInterrupted()) - throw new InterruptedException(); - else - throw new IllegalStateException("Failed to lock offset " + offset + " of " + registryFile + " within " - + BUSY_LOCK_LIMIT / 1e9 + " seconds."); - } - - public void unlockLong(long offset) throws IllegalMonitorStateException { - long id = uniqueTid(); - long firstValue = (1L << 48) | id; - if (compareAndSwapLong(offset, firstValue, 0)) - return; - // try to check the lowId and the count. - unlockFailedLong(offset, id); - } - - public void resetLockLong(long offset) { - writeOrderedLong(offset, 0L); - } - - public boolean tryLockLong(long offset) { - long id = uniqueTid(); - return tryLockNanos8a(offset, id); - } - - public boolean tryLockNanosLong(long offset, long nanos) { - long id = uniqueTid(); - int limit = nanos <= 10000 ? (int) nanos / 10 : 1000; - for (int i = 0; i < limit; i++) - if (tryLockNanos8a(offset, id)) - return true; - if (nanos <= 10000) - return false; - return tryLockNanosLong0(offset, nanos, id); - } - - private boolean tryLockNanosLong0(long offset, long nanos, long id) { - long nanos0 = Math.min(nanos, SLEEP_THRESHOLD); - long start = System.nanoTime(); - long end0 = start + nanos0 - 10000; - do { - if (tryLockNanos8a(offset, id)) - return true; - } while (end0 > System.nanoTime() && !currentThread().isInterrupted()); - - long end = start + nanos - SLEEP_THRESHOLD; - if (LOGGER.isDebugEnabled()) { - LOGGER.debug(currentThread().getName() + ", waiting for lock"); - } - - try { - do { - if (tryLockNanos8a(offset, id)) { - long millis = (System.nanoTime() - start) / 1000000; - if (millis > 200) { - LOGGER.warn(currentThread().getName() + - ", to obtain a lock took " + - millis / 1e3 + " seconds"); - } - return true; - } - Thread.sleep(1); - } while (end > System.nanoTime()); - } catch (InterruptedException ignored) { - Thread.currentThread().interrupt(); - } - return false; - } - - private boolean tryLockNanos8a(long offset, long id) { - long firstValue = (1L << 48) | id; - if (compareAndSwapLong(offset, 0, firstValue)) - return true; - long currentValue = readLong(offset); - long lockedId = currentValue & ((1L << 48) - 1); - if (lockedId == 0) { - int count = (int) (currentValue >>> 48); - if (count != 0) - LOGGER.warn("Lock held by threadId 0 !?"); - return compareAndSwapLong(offset, currentValue, firstValue); - } - if (lockedId == id) { - if (currentValue >>> 48 == 65535) - throw new IllegalStateException("Reentered 65535 times without an unlock"); - currentValue += 1L << 48; - writeOrderedLong(offset, currentValue); - return true; - } - return false; - } - - private void unlockFailedLong(long offset, long id) throws IllegalMonitorStateException { - long currentValue = readLong(offset); - long holderId = currentValue & (-1L >>> 16); - if (holderId == id) { - currentValue -= 1L << 48; - writeOrderedLong(offset, currentValue); - - } else if (currentValue == 0) { - throw new IllegalMonitorStateException("No thread holds this lock"); - - } else { - throw new IllegalMonitorStateException("Process " + ((currentValue >>> 32) & 0xFFFF) - + " thread " + (holderId & (-1L >>> 32)) - + " holds this lock, " + (currentValue >>> 48) - + " times, unlock from " + getProcessId() - + " thread " + currentThread().getId()); - } - } - - static final Unsafe UNSAFE; - static final int BYTES_OFFSET; - - static { - try { - Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe"); - theUnsafe.setAccessible(true); - UNSAFE = (Unsafe) theUnsafe.get(null); - BYTES_OFFSET = UNSAFE.arrayBaseOffset(byte[].class); - } catch (Exception e) { - throw new AssertionError(e); - } - } - - public boolean compareAndSwapLong(long offset, long expected, long x) { - if (buffer instanceof DirectBuffer) - return UNSAFE.compareAndSwapLong(null, ((DirectBuffer) buffer).address() + offset, expected, x); - return UNSAFE.compareAndSwapLong(buffer.array(), BYTES_OFFSET + offset, expected, x); - } - - public long readVolatileLong(int offset) { - readBarrier(); - return readLong(offset); - } - - public long readLong(long offset) { - return buffer.getLong((int) offset); - } - - public void writeOrderedLong(long offset, long v) { - writeLong(offset, v); - writeBarrier(); - } - - public void writeLong(long offset, long v) { - buffer.putLong((int) offset, v); - } - - private AtomicBoolean barrier; - - private void readBarrier() { - if (barrier == null) - barrier = new AtomicBoolean(); - barrier.get(); - } - - private void writeBarrier() { - if (barrier == null) - barrier = new AtomicBoolean(); - barrier.lazySet(false); - } - private String readString() { int sz = buffer.getShort(); if (sz == -1) { diff --git a/common/src/test/java/org/mvndaemon/mvnd/common/DaemonRegistryTest.java b/common/src/test/java/org/mvndaemon/mvnd/common/DaemonRegistryTest.java index d693cb388..f5903e0cc 100644 --- a/common/src/test/java/org/mvndaemon/mvnd/common/DaemonRegistryTest.java +++ b/common/src/test/java/org/mvndaemon/mvnd/common/DaemonRegistryTest.java @@ -31,25 +31,26 @@ public class DaemonRegistryTest { @Test public void testReadWrite() throws IOException { Path temp = File.createTempFile("reg", ".data").toPath(); - DaemonRegistry reg1 = new DaemonRegistry(temp); - DaemonRegistry reg2 = new DaemonRegistry(temp); - - assertNotNull(reg1.getAll()); - assertEquals(0, reg1.getAll().size()); - assertNotNull(reg2.getAll()); - assertEquals(0, reg2.getAll().size()); - - byte[] token = new byte[16]; - new Random().nextBytes(token); - reg1.store(new DaemonInfo("the-uid", "/java/home/", - "/data/reg/", 0x12345678, 7502, - Locale.getDefault().toLanguageTag(), Arrays.asList("-Xmx"), - DaemonState.Idle, System.currentTimeMillis(), System.currentTimeMillis())); - - assertNotNull(reg1.getAll()); - assertEquals(1, reg1.getAll().size()); - assertNotNull(reg2.getAll()); - assertEquals(1, reg2.getAll().size()); + try (DaemonRegistry reg1 = new DaemonRegistry(temp); + DaemonRegistry reg2 = new DaemonRegistry(temp)) { + assertNotNull(reg1.getAll()); + assertEquals(0, reg1.getAll().size()); + assertNotNull(reg2.getAll()); + assertEquals(0, reg2.getAll().size()); + + byte[] token = new byte[16]; + new Random().nextBytes(token); + reg1.store(new DaemonInfo("the-uid", "/java/home/", + "/data/reg/", 0x12345678, 7502, + Locale.getDefault().toLanguageTag(), Arrays.asList("-Xmx"), + DaemonState.Idle, System.currentTimeMillis(), System.currentTimeMillis())); + + assertNotNull(reg1.getAll()); + assertEquals(1, reg1.getAll().size()); + assertNotNull(reg2.getAll()); + assertEquals(1, reg2.getAll().size()); + } + } } diff --git a/integration-tests/src/test/java/org/mvndaemon/mvnd/junit/TestRegistry.java b/integration-tests/src/test/java/org/mvndaemon/mvnd/junit/TestRegistry.java index 2207e7760..c87dc0bdf 100644 --- a/integration-tests/src/test/java/org/mvndaemon/mvnd/junit/TestRegistry.java +++ b/integration-tests/src/test/java/org/mvndaemon/mvnd/junit/TestRegistry.java @@ -45,7 +45,7 @@ public void killAll() { if (maybeHandle.isPresent()) { final ProcessHandle handle = maybeHandle.get(); final CompletableFuture exit = handle.onExit(); - handle.destroyForcibly(); + handle.destroy(); exit.get(5, TimeUnit.SECONDS); } } catch (Exception t) {