From 4a0416b2bf62c162370c39813b69ad6e252e2762 Mon Sep 17 00:00:00 2001 From: Oleksandr Vayda Date: Mon, 28 Mar 2022 16:42:22 +0200 Subject: [PATCH] spline #1050 Add "spline.database.activeFailover" property. Always disable active failover on AdminCLI. --- .../za/co/absa/spline/admin/AdminCLI.scala | 2 +- .../src/main/webapp/META-INF/context.xml | 11 +++- persistence/pom.xml | 2 +- .../internal/InternalArangoDatabaseOps.scala | 38 +++++++++---- .../VstCommunicationDestructor.scala | 53 ------------------- .../velocystream/VstImplicits.scala} | 21 ++++---- .../persistence/ArangoDatabaseFacade.scala | 19 +++++-- .../spline/persistence/ArangoManager.scala | 6 +-- .../persistence/ArangoManagerFactory.scala | 4 +- .../spline/persistence/ArangoRepoConfig.scala | 7 ++- .../src/main/webapp/META-INF/context.xml | 8 +++ 11 files changed, 86 insertions(+), 85 deletions(-) delete mode 100644 persistence/src/main/scala/com/arangodb/internal/velocystream/VstCommunicationDestructor.scala rename persistence/src/main/scala/com/arangodb/{async/internal/ArangoExecutorAsyncDestructor.scala => internal/velocystream/VstImplicits.scala} (54%) diff --git a/admin/src/main/scala/za/co/absa/spline/admin/AdminCLI.scala b/admin/src/main/scala/za/co/absa/spline/admin/AdminCLI.scala index 576d7f176..026792620 100644 --- a/admin/src/main/scala/za/co/absa/spline/admin/AdminCLI.scala +++ b/admin/src/main/scala/za/co/absa/spline/admin/AdminCLI.scala @@ -53,7 +53,7 @@ object AdminCLI extends App { ) } - private val dbManagerFactoryImpl = new ArangoManagerFactoryImpl() + private val dbManagerFactoryImpl = new ArangoManagerFactoryImpl(activeFailover = false) private val maybeConsole = InputConsole.systemConsoleIfAvailable() val dbManagerFactory = maybeConsole diff --git a/kafka-gateway/src/main/webapp/META-INF/context.xml b/kafka-gateway/src/main/webapp/META-INF/context.xml index 2e53b5f5a..af1ab226d 100644 --- a/kafka-gateway/src/main/webapp/META-INF/context.xml +++ b/kafka-gateway/src/main/webapp/META-INF/context.xml @@ -31,8 +31,9 @@ + @@ -45,4 +46,12 @@ + + + + + + diff --git a/persistence/pom.xml b/persistence/pom.xml index 16dd7bdaa..758ba6699 100644 --- a/persistence/pom.xml +++ b/persistence/pom.xml @@ -78,7 +78,7 @@ com.arangodb arangodb-java-driver - 6.16.0 + 6.16.1 com.arangodb diff --git a/persistence/src/main/scala/com/arangodb/internal/InternalArangoDatabaseOps.scala b/persistence/src/main/scala/com/arangodb/internal/InternalArangoDatabaseOps.scala index c4f468dff..d3f03ee4a 100644 --- a/persistence/src/main/scala/com/arangodb/internal/InternalArangoDatabaseOps.scala +++ b/persistence/src/main/scala/com/arangodb/internal/InternalArangoDatabaseOps.scala @@ -17,31 +17,51 @@ package com.arangodb.internal import com.arangodb.async.ArangoDatabaseAsync -import com.arangodb.async.internal.{ArangoExecutorAsync, ArangoExecutorAsyncDestructor} -import com.arangodb.internal.velocystream.VstCommunicationDestructor -import com.arangodb.internal.velocystream.VstCommunicationDestructor.ConnectionParams +import com.arangodb.async.internal.ArangoExecutorAsync +import com.arangodb.internal.net.{AccessType, HostDescription} +import com.arangodb.internal.velocystream.VstCommunication +import com.arangodb.internal.velocystream.internal.VstConnection +import org.apache.commons.lang3.StringUtils import org.apache.http.auth.UsernamePasswordCredentials +import za.co.absa.commons.reflect.ReflectionUtils +import za.co.absa.commons.reflect.ReflectionUtils.extractFieldValue import za.co.absa.spline.common.rest.{HttpStatusException, RESTClient, RESTClientApacheHttpImpl} import java.net.URI +import javax.net.ssl.SSLContext import scala.concurrent.{ExecutionContext, Future} class InternalArangoDatabaseOps(db: ArangoDatabaseAsync)(implicit ec: ExecutionContext) { + import com.arangodb.internal.velocystream.VstImplicits._ + /** * @see [[https://github.com/arangodb/arangodb-java-driver/issues/353]] */ def restClient: RESTClient = { - val asyncExecutable = db.asInstanceOf[ArangoExecuteable[ArangoExecutorAsync]] - val ArangoExecutorAsyncDestructor(vstComm) = asyncExecutable.executor - val VstCommunicationDestructor(ConnectionParams(hostDescription, user, maybePassword, maybeSslContext)) = vstComm + val vstComm = { + val asyncExecutable = db.asInstanceOf[ArangoExecuteable[ArangoExecutorAsync]] + val executor = asyncExecutable.executor + ReflectionUtils.extractFieldValue[ArangoExecutorAsync, VstCommunication[_, VstConnection[_]]](executor, "communication") + } + val connection = vstComm.connect(AccessType.WRITE) + + val maybeSslContext = Option(extractFieldValue[VstConnection[_], SSLContext](connection, "sslContext")) val scheme = if (maybeSslContext.isDefined) "https" else "http" + + val hostDescription = extractFieldValue[VstConnection[_], HostDescription](connection, "host") val host = hostDescription.getHost val port = hostDescription.getPort - val password = maybePassword.getOrElse("") - val maybeCredentials = Option(user).map(user => new UsernamePasswordCredentials(user, password)) + val database = db.dbName + + val maybeCredentials = + for { + username <- Option(vstComm.getUser) + password = StringUtils.defaultString(vstComm.getPassword) + } yield + new UsernamePasswordCredentials(username, password) - new RESTClientApacheHttpImpl(new URI(s"$scheme://$host:$port/_db/${db.name}"), maybeCredentials, maybeSslContext) + new RESTClientApacheHttpImpl(new URI(s"$scheme://$host:$port/_db/$database"), maybeCredentials, maybeSslContext) } diff --git a/persistence/src/main/scala/com/arangodb/internal/velocystream/VstCommunicationDestructor.scala b/persistence/src/main/scala/com/arangodb/internal/velocystream/VstCommunicationDestructor.scala deleted file mode 100644 index 6e8537624..000000000 --- a/persistence/src/main/scala/com/arangodb/internal/velocystream/VstCommunicationDestructor.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright 2020 ABSA Group Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.arangodb.internal.velocystream - -import com.arangodb.internal.net.{AccessType, HostDescription, HostHandler, HostImpl} -import com.arangodb.internal.velocystream.internal.VstConnection -import za.co.absa.commons.reflect.ReflectionUtils.extractFieldValue - -import javax.net.ssl.SSLContext - -object VstCommunicationDestructor { - - private object Fields { - final val HostHandler = "hostHandler" - final val SslContext = "sslContext" - } - - case class ConnectionParams( - hostDescription: HostDescription, - user: String, - password: Option[String], - sslContext: Option[SSLContext] - ) - - def unapply(comm: VstCommunication[_, _]): Option[ConnectionParams] = { - val hostHandler = extractFieldValue[VstCommunication[_, _], HostHandler](comm, Fields.HostHandler) - val host = hostHandler.get(null, AccessType.WRITE).asInstanceOf[HostImpl] - val sslContext = extractFieldValue[VstConnection[_], SSLContext](host.connection, Fields.SslContext) - - val connParams = ConnectionParams( - host.getDescription, - comm.user, - Option(comm.password), - Option(sslContext) - ) - - Some(connParams) - } -} diff --git a/persistence/src/main/scala/com/arangodb/async/internal/ArangoExecutorAsyncDestructor.scala b/persistence/src/main/scala/com/arangodb/internal/velocystream/VstImplicits.scala similarity index 54% rename from persistence/src/main/scala/com/arangodb/async/internal/ArangoExecutorAsyncDestructor.scala rename to persistence/src/main/scala/com/arangodb/internal/velocystream/VstImplicits.scala index d0a2a0d6f..be884dde0 100644 --- a/persistence/src/main/scala/com/arangodb/async/internal/ArangoExecutorAsyncDestructor.scala +++ b/persistence/src/main/scala/com/arangodb/internal/velocystream/VstImplicits.scala @@ -1,5 +1,5 @@ /* - * Copyright 2020 ABSA Group Limited + * Copyright 2022 ABSA Group Limited * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,18 +14,19 @@ * limitations under the License. */ -package com.arangodb.async.internal +package com.arangodb.internal.velocystream -import com.arangodb.internal.velocystream.VstCommunication +import com.arangodb.internal.net.AccessType import com.arangodb.internal.velocystream.internal.VstConnection -import za.co.absa.commons.reflect.ReflectionUtils -object ArangoExecutorAsyncDestructor { - private final val CommunicationField = "communication" +object VstImplicits { - def unapply(executor: ArangoExecutorAsync): Option[VstCommunication[_, _ <: VstConnection[_]]] = { - Option( - ReflectionUtils.extractFieldValue[ArangoExecutorAsync, VstCommunication[_, _ <: VstConnection[_]]](executor, CommunicationField) - ) + implicit class InternalVstCommunicationOps(val vstComm: VstCommunication[_, VstConnection[_]]) extends AnyVal { + + def getUser: String = vstComm.user + + def getPassword: String = vstComm.password + + def connect(accessType: AccessType): VstConnection[_] = vstComm.connect(null, accessType) } } diff --git a/persistence/src/main/scala/za/co/absa/spline/persistence/ArangoDatabaseFacade.scala b/persistence/src/main/scala/za/co/absa/spline/persistence/ArangoDatabaseFacade.scala index a9857b12b..7018398de 100644 --- a/persistence/src/main/scala/za/co/absa/spline/persistence/ArangoDatabaseFacade.scala +++ b/persistence/src/main/scala/za/co/absa/spline/persistence/ArangoDatabaseFacade.scala @@ -16,7 +16,9 @@ package za.co.absa.spline.persistence +import com.arangodb.DbName import com.arangodb.async.{ArangoDBAsync, ArangoDatabaseAsync} +import com.arangodb.entity.LoadBalancingStrategy import com.arangodb.velocypack.module.scala.VPackScalaModule import org.slf4s.Logging import org.springframework.beans.factory.DisposableBean @@ -27,7 +29,8 @@ import za.co.absa.commons.version.impl.SemVer20Impl.SemanticVersion import javax.net.ssl._ import scala.concurrent._ -class ArangoDatabaseFacade(connectionURL: ArangoConnectionURL, maybeSSLContext: Option[SSLContext]) extends DisposableBean { +class ArangoDatabaseFacade(connectionURL: ArangoConnectionURL, maybeSSLContext: Option[SSLContext], activeFailover: Boolean) + extends DisposableBean { import za.co.absa.spline.persistence.ArangoDatabaseFacade._ @@ -48,8 +51,16 @@ class ArangoDatabaseFacade(connectionURL: ArangoConnectionURL, maybeSSLContext: .having(maybeSSLContext)(_ sslContext _) } - // enable active failover - arangoBuilder.acquireHostList(true) + if (activeFailover) { + arangoBuilder.acquireHostList(true) + arangoBuilder.loadBalancingStrategy(LoadBalancingStrategy.NONE) + } else { + arangoBuilder.acquireHostList(false) + if (hostsWithPorts.size > 1) { + arangoBuilder.loadBalancingStrategy(LoadBalancingStrategy.ROUND_ROBIN) + } + } + for ((host, port) <- hostsWithPorts) arangoBuilder.host(host, port) // build ArangoDB Client @@ -59,7 +70,7 @@ class ArangoDatabaseFacade(connectionURL: ArangoConnectionURL, maybeSSLContext: // The val is lazy to not prevent a facade instance from being created. // It allows connection to be re-attempted later and the {{shutdown()}} method to be called. lazy val db: ArangoDatabaseAsync = { - val db = arango.db(dbName) + val db = arango.db(DbName.of(dbName)) warmUpDb(db) db } diff --git a/persistence/src/main/scala/za/co/absa/spline/persistence/ArangoManager.scala b/persistence/src/main/scala/za/co/absa/spline/persistence/ArangoManager.scala index 72b71a29e..5d56e5ee5 100644 --- a/persistence/src/main/scala/za/co/absa/spline/persistence/ArangoManager.scala +++ b/persistence/src/main/scala/za/co/absa/spline/persistence/ArangoManager.scala @@ -129,9 +129,9 @@ class ArangoManagerImpl( for { exists <- db.exists.toScala _ <- if (exists && !dropIfExists) - throw new IllegalArgumentException(s"Arango Database ${db.name()} already exists") + throw new IllegalArgumentException(s"Arango Database ${db.dbName} already exists") else if (exists && dropIfExists) { - log.info(s"Drop database: ${db.name}") + log.info(s"Drop database: ${db.dbName}") db.drop().toScala } else Future.successful({}) @@ -260,7 +260,7 @@ class ArangoManagerImpl( log.debug(s"Delete search analyzers") for { analyzers <- db.getSearchAnalyzers.toScala.map(_.asScala) - userAnalyzers = analyzers.filter(_.getName.startsWith(s"${db.name}::")) + userAnalyzers = analyzers.filter(_.getName.startsWith(s"${db.dbName}::")) _ <- Future.traverse(userAnalyzers)(ua => { log.info(s"Delete search analyzer: ${ua.getName}") db.deleteSearchAnalyzer(ua.getName).toScala diff --git a/persistence/src/main/scala/za/co/absa/spline/persistence/ArangoManagerFactory.scala b/persistence/src/main/scala/za/co/absa/spline/persistence/ArangoManagerFactory.scala index ecafc4589..63f840cd6 100644 --- a/persistence/src/main/scala/za/co/absa/spline/persistence/ArangoManagerFactory.scala +++ b/persistence/src/main/scala/za/co/absa/spline/persistence/ArangoManagerFactory.scala @@ -27,7 +27,7 @@ trait ArangoManagerFactory { def create(connectionURL: ArangoConnectionURL, maybeSSLContext: Option[SSLContext]): ArangoManager } -class ArangoManagerFactoryImpl()(implicit ec: ExecutionContext) extends ArangoManagerFactory { +class ArangoManagerFactoryImpl(activeFailover: Boolean)(implicit ec: ExecutionContext) extends ArangoManagerFactory { import ArangoImplicits._ @@ -48,7 +48,7 @@ class ArangoManagerFactoryImpl()(implicit ec: ExecutionContext) extends ArangoMa } def dbFacade(): ArangoDatabaseFacade = - new ArangoDatabaseFacade(connectionURL, maybeSSLContext) + new ArangoDatabaseFacade(connectionURL, maybeSSLContext, activeFailover) new AutoClosingArangoManagerProxy(dbManager, dbFacade) } diff --git a/persistence/src/main/scala/za/co/absa/spline/persistence/ArangoRepoConfig.scala b/persistence/src/main/scala/za/co/absa/spline/persistence/ArangoRepoConfig.scala index 8ff00960f..d9090ec3e 100644 --- a/persistence/src/main/scala/za/co/absa/spline/persistence/ArangoRepoConfig.scala +++ b/persistence/src/main/scala/za/co/absa/spline/persistence/ArangoRepoConfig.scala @@ -29,15 +29,17 @@ import za.co.absa.spline.common.security.TLSUtils class ArangoRepoConfig extends InitializingBean with Logging { import za.co.absa.spline.persistence.ArangoRepoConfig._ + import scala.concurrent.ExecutionContext.Implicits._ override def afterPropertiesSet(): Unit = { log.info(s"Spline database URL: ${Database.ConnectionURL.asString}") + log.info(s"ArangoDB Active Failover: ${Database.ActiveFailoverMode}") } @Bean def arangoDatabaseFacade: ArangoDatabaseFacade = { val sslCtxOpt = Option.when(Database.DisableSSLValidation)(TLSUtils.TrustingAllSSLContext) - new ArangoDatabaseFacade(Database.ConnectionURL, sslCtxOpt) + new ArangoDatabaseFacade(Database.ConnectionURL, sslCtxOpt, Database.ActiveFailoverMode) } @Bean def arangoDatabase: ArangoDatabaseAsync = arangoDatabaseFacade.db @@ -64,6 +66,9 @@ object ArangoRepoConfig extends DefaultConfigurationStack with ConfTyped { val DisableSSLValidation: Boolean = conf.getBoolean(Prop("disableSslValidation"), false) + + val ActiveFailoverMode: Boolean = + conf.getBoolean(Prop("activeFailover"), false) } } diff --git a/rest-gateway/src/main/webapp/META-INF/context.xml b/rest-gateway/src/main/webapp/META-INF/context.xml index a138d516d..448188afb 100644 --- a/rest-gateway/src/main/webapp/META-INF/context.xml +++ b/rest-gateway/src/main/webapp/META-INF/context.xml @@ -22,6 +22,14 @@ --> + + + + + +