From 014bc8bc5a150dcb7eb7c8fc70ad053cd622118c Mon Sep 17 00:00:00 2001 From: wangbin Date: Tue, 4 Jan 2022 17:13:44 +0800 Subject: [PATCH] fix init clusterServerUrls --- .../src/main/kotlin/io/hstream/impl/HStreamClientKtImpl.kt | 3 ++- client/src/main/kotlin/io/hstream/impl/Utils.kt | 6 +++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/client/src/main/kotlin/io/hstream/impl/HStreamClientKtImpl.kt b/client/src/main/kotlin/io/hstream/impl/HStreamClientKtImpl.kt index 40080d46..0a8dd5d7 100644 --- a/client/src/main/kotlin/io/hstream/impl/HStreamClientKtImpl.kt +++ b/client/src/main/kotlin/io/hstream/impl/HStreamClientKtImpl.kt @@ -42,13 +42,14 @@ class HStreamClientKtImpl(bootstrapServerUrls: List) : HStreamClient { val describeClusterResponse = unaryCallWithCurrentUrls(bootstrapServerUrls, channelProvider) { stub -> stub.describeCluster(Empty.newBuilder().build()) } val serverNodes = describeClusterResponse.serverNodesList val serverUrls: ArrayList = ArrayList(serverNodes.size) - clusterServerUrls.compareAndSet(null, serverUrls) + clusterServerUrls.set(serverUrls) for (serverNode in serverNodes) { val host = serverNode.host val port = serverNode.port logger.info("serverUrl: {}", "$host:$port") serverUrls.add("$host:$port") } + logger.info("clusterServerUrls: {}", clusterServerUrls.get()) } override fun close() { diff --git a/client/src/main/kotlin/io/hstream/impl/Utils.kt b/client/src/main/kotlin/io/hstream/impl/Utils.kt index 3b27f8af..848c2b10 100644 --- a/client/src/main/kotlin/io/hstream/impl/Utils.kt +++ b/client/src/main/kotlin/io/hstream/impl/Utils.kt @@ -14,6 +14,7 @@ import java.util.concurrent.atomic.AtomicReference val logger: Logger = LoggerFactory.getLogger("kt-coroutine-utils") suspend fun unaryCallWithCurrentUrlsCoroutine(serverUrls: List, channelProvider: ChannelProvider, call: suspend (stub: HStreamApiCoroutineStub) -> Resp): Resp { + check(serverUrls.isNotEmpty()) logger.info("unaryCallWithCurrentUrl urls are {}", serverUrls) for (i in serverUrls.indices) { val stub = HStreamApiCoroutineStub(channelProvider.get(serverUrls[i])) @@ -58,8 +59,11 @@ suspend fun unaryCallCoroutine(urlsRef: AtomicReference>, ch } catch (e: Exception) { logger.warn("unary call error for url: {}", urls[0], e) if (urls.size > 1) { + logger.info("before refreshClusterInfo, urls are {}", urls) val newServerUrls = refreshClusterInfo(urls.subList(1, urls.size), channelProvider) - urlsRef.compareAndSet(urls, newServerUrls) + // urlsRef.compareAndSet(urls, newServerUrls) + urlsRef.set(newServerUrls) + logger.info("after refreshClusterInfo, urls are {}", urlsRef.get()) return unaryCallWithCurrentUrlsCoroutine(urlsRef.get(), channelProvider, call) } else { throw e