Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ public class CachedSchemaTransactionV2 extends SchemaTransactionV2 {

// MetaDriver doesn't expose unlisten, register the meta listener once.
// Lifecycle: this JVM-global flag is intentionally never reset by
// unlistenChanges() (the underlying gRPC watch is process-wide). If that
// watch is silently dropped after a transport reconnect, recovery is not
// automatic; resetMetaListenerForReconnect() is only a manual hook to let
// the next schema operation install a fresh watch.
// unlistenChanges() (the underlying gRPC watch is process-wide). The driver
// watch self-heals across transport reconnects (PdMetaDriver via KvClient,
// EtcdMetaDriver via Watch.Listener re-subscribe), so the subscription stays
// live and the flag staying true is correct.
private static final AtomicBoolean metaEventListenerRegistered =
new AtomicBoolean(false);

Expand Down Expand Up @@ -251,27 +251,6 @@ static <T> void handleSchemaCacheClearEvent(T response) {
}
}

/**
* Manually reset the JVM-global meta listener flag after detecting that
* the MetaManager transport reconnected and dropped the underlying gRPC
* watch. This method is not wired to a MetaManager/MetaDriver reconnect
* callback today; callers must invoke it explicitly after detecting that
* condition. Without such a manual reset {@link #metaEventListenerRegistered}
* would stay {@code true} forever and this JVM would stop receiving
* cross-node schema cache clear events with no error or warning.
*
* <p>TODO: wire this into MetaManager once it exposes a transport
* reconnect callback (e.g. {@code listenReconnect} /
* {@code onTransportReconnect}). Until then it must be invoked
* explicitly by code that detects the reconnect.
*/
public static void resetMetaListenerForReconnect() {
if (metaEventListenerRegistered.compareAndSet(true, false)) {
LOG.warn("Schema cache clear meta listener lost on reconnect - " +
"will re-register on next schema operation.");
}
}

public void clearCache(boolean notify) {
// Same TOCTOU ordering as clearSchemaCache(String): clear nameCache
// first, then the array attachment, then idCache last.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import org.apache.commons.io.FileUtils;
Expand All @@ -33,7 +36,9 @@
import org.apache.hugegraph.meta.lock.LockResult;
import org.apache.hugegraph.type.define.CollectionType;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.Log;
import org.apache.hugegraph.util.collection.CollectionFactory;
import org.slf4j.Logger;

import com.google.common.base.Strings;

Expand All @@ -42,6 +47,7 @@
import io.etcd.jetcd.ClientBuilder;
import io.etcd.jetcd.KV;
import io.etcd.jetcd.KeyValue;
import io.etcd.jetcd.Watch;
import io.etcd.jetcd.kv.GetResponse;
import io.etcd.jetcd.lease.LeaseKeepAliveResponse;
import io.etcd.jetcd.options.DeleteOption;
Expand All @@ -57,9 +63,24 @@

public class EtcdMetaDriver implements MetaDriver {

private static final Logger LOG = Log.logger(EtcdMetaDriver.class);

private final Client client;
private final EtcdDistributedLock lock;

// Re-subscribes a dropped watch off the jetcd callback thread; single
// daemon thread, process-lifetime (no close()), so JVM shutdown reclaims it.
private final ScheduledExecutorService reWatchExecutor =
Executors.newSingleThreadScheduledExecutor(r -> {
Thread thread = new Thread(r, "etcd-meta-rewatch");
thread.setDaemon(true);
return thread;
});

// Backoff before re-subscribing a dropped watch. Package-private and
// mutable only so tests can set it to 0; never reassigned in production.
long reWatchDelayMs = 1000L;

public EtcdMetaDriver(String trustFile, String clientCertFile,
String clientKeyFile, Object... endpoints) {
ClientBuilder builder = this.etcdMetaDriverBuilder(endpoints);
Expand All @@ -76,6 +97,13 @@ public EtcdMetaDriver(Object... endpoints) {
this.lock = EtcdDistributedLock.getInstance(this.client);
}

// Package-private constructor for tests: inject a mock Client and skip lock
// setup (watch tests never touch the distributed lock).
EtcdMetaDriver(Client client) {
this.client = client;
this.lock = null;
}

private static ByteSequence toByteSequence(String content) {
return ByteSequence.from(content.getBytes());
}
Expand Down Expand Up @@ -303,9 +331,8 @@ public void unlock(String key, LockResult lockResult) {
@SuppressWarnings("unchecked")
@Override
public <T> void listen(String key, Consumer<T> consumer) {

this.client.getWatchClient().watch(toByteSequence(key),
(Consumer<WatchResponse>) consumer);
this.watchKey(toByteSequence(key), WatchOption.DEFAULT,
(Consumer<WatchResponse>) consumer);
}

/**
Expand All @@ -314,9 +341,35 @@ public <T> void listen(String key, Consumer<T> consumer) {
@SuppressWarnings("unchecked")
@Override
public <T> void listenPrefix(String prefix, Consumer<T> consumer) {
ByteSequence sequence = toByteSequence(prefix);
WatchOption option = WatchOption.newBuilder().isPrefix(true).build();
this.client.getWatchClient().watch(sequence, option, (Consumer<WatchResponse>) consumer);
this.watchKey(toByteSequence(prefix), option,
(Consumer<WatchResponse>) consumer);
}

/**
* Subscribe a self-healing watch. Unlike the bare {@code Consumer} overload,
* this surfaces {@code onError}/{@code onCompleted}: when the underlying
* watch terminates (e.g. a transport reconnect drops the gRPC stream) it
* re-subscribes after a short backoff, so the listener is not silently lost.
* Mirrors the re-subscribe behaviour PdMetaDriver already gets from KvClient.
*/
private void watchKey(ByteSequence key, WatchOption option,
Consumer<WatchResponse> consumer) {
Watch.Listener listener = Watch.listener(
consumer,
throwable -> this.scheduleReWatch(key, option, consumer, throwable),
() -> this.scheduleReWatch(key, option, consumer, null));
// Watcher intentionally not closed: process-lifetime watch, recreated
// on self-heal; the prior watcher's stream has already terminated.
this.client.getWatchClient().watch(key, option, listener);
}

private void scheduleReWatch(ByteSequence key, WatchOption option,
Consumer<WatchResponse> consumer,
Throwable cause) {
LOG.warn("etcd meta watch dropped, re-subscribing", cause);
this.reWatchExecutor.schedule(
() -> this.watchKey(key, option, consumer),
this.reWatchDelayMs, TimeUnit.MILLISECONDS);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package org.apache.hugegraph.meta;

import java.util.concurrent.atomic.AtomicReference;

import org.apache.hugegraph.testutil.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.Watch;
import io.etcd.jetcd.options.WatchOption;
import io.etcd.jetcd.watch.WatchResponse;

/**
* Unit tests for {@link EtcdMetaDriver}'s self-healing watch: a dropped watch
* (onError / onCompleted) must be re-subscribed so the listener is not silently
* lost after a transport reconnect (issue #3036). Uses a mock jetcd
* {@link Client} via the package-private test constructor; no live etcd needed.
*/
public class EtcdMetaDriverTest {

@Test
public void testListenReSubscribesOnError() {
Watch watch = Mockito.mock(Watch.class);
EtcdMetaDriver driver = newDriver(watch);

driver.listen("k", response -> { });
captureListener(watch).onError(new RuntimeException("watch dropped"));

// The watch terminated, so the driver must re-subscribe: a second
// watch() call lands once the (0ms) backoff task runs.
Mockito.verify(watch, Mockito.timeout(2000).times(2))
.watch(Mockito.any(ByteSequence.class),
Mockito.any(WatchOption.class),
Mockito.any(Watch.Listener.class));
}

@Test
public void testListenReSubscribesOnCompleted() {
Watch watch = Mockito.mock(Watch.class);
EtcdMetaDriver driver = newDriver(watch);

driver.listen("k", response -> { });
captureListener(watch).onCompleted();

Mockito.verify(watch, Mockito.timeout(2000).times(2))
.watch(Mockito.any(ByteSequence.class),
Mockito.any(WatchOption.class),
Mockito.any(Watch.Listener.class));
}

@Test
public void testListenPrefixReSubscribesOnError() {
Watch watch = Mockito.mock(Watch.class);
EtcdMetaDriver driver = newDriver(watch);

driver.listenPrefix("prefix", response -> { });
captureListener(watch).onError(new RuntimeException("watch dropped"));

Mockito.verify(watch, Mockito.timeout(2000).times(2))
.watch(Mockito.any(ByteSequence.class),
Mockito.any(WatchOption.class),
Mockito.any(Watch.Listener.class));
}

@Test
public void testListenDeliversEventsToConsumer() {
Watch watch = Mockito.mock(Watch.class);
AtomicReference<WatchResponse> received = new AtomicReference<>();
EtcdMetaDriver driver = newDriver(watch);

driver.listen("k", received::set);
WatchResponse response = Mockito.mock(WatchResponse.class);
captureListener(watch).onNext(response);

Assert.assertSame(response, received.get());
}

private static EtcdMetaDriver newDriver(Watch watch) {
Client client = Mockito.mock(Client.class);
Mockito.when(client.getWatchClient()).thenReturn(watch);
EtcdMetaDriver driver = new EtcdMetaDriver(client);
// No backoff in tests so the re-subscribe runs promptly.
driver.reWatchDelayMs = 0L;
return driver;
}

private static Watch.Listener captureListener(Watch watch) {
ArgumentCaptor<Watch.Listener> captor =
ArgumentCaptor.forClass(Watch.Listener.class);
Mockito.verify(watch).watch(Mockito.any(ByteSequence.class),
Mockito.any(WatchOption.class),
captor.capture());
return captor.getValue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hugegraph.unit;

import org.apache.hugegraph.core.RoleElectionStateMachineTest;
import org.apache.hugegraph.meta.EtcdMetaDriverTest;
import org.apache.hugegraph.meta.MetaManagerSchemaCacheClearEventTest;
import org.apache.hugegraph.traversal.optimize.TraversalUtilOptimizeTest;
import org.apache.hugegraph.unit.api.filter.LoadDetectFilterTest;
Expand Down Expand Up @@ -99,6 +100,7 @@
CacheTest.LevelCacheTest.class,
CachedSchemaTransactionTest.class,
MetaManagerSchemaCacheClearEventTest.class,
EtcdMetaDriverTest.class,
CachedGraphTransactionTest.class,
CacheManagerTest.class,
RamTableTest.class,
Expand Down
Loading