diff --git a/admin/src/main/scala/za/co/absa/spline/admin/AdminCLI.scala b/admin/src/main/scala/za/co/absa/spline/admin/AdminCLI.scala
index 576d7f176..026792620 100644
--- a/admin/src/main/scala/za/co/absa/spline/admin/AdminCLI.scala
+++ b/admin/src/main/scala/za/co/absa/spline/admin/AdminCLI.scala
@@ -53,7 +53,7 @@ object AdminCLI extends App {
)
}
- private val dbManagerFactoryImpl = new ArangoManagerFactoryImpl()
+ private val dbManagerFactoryImpl = new ArangoManagerFactoryImpl(activeFailover = false)
private val maybeConsole = InputConsole.systemConsoleIfAvailable()
val dbManagerFactory = maybeConsole
diff --git a/kafka-gateway/src/main/webapp/META-INF/context.xml b/kafka-gateway/src/main/webapp/META-INF/context.xml
index 2e53b5f5a..af1ab226d 100644
--- a/kafka-gateway/src/main/webapp/META-INF/context.xml
+++ b/kafka-gateway/src/main/webapp/META-INF/context.xml
@@ -31,8 +31,9 @@
+
@@ -45,4 +46,12 @@
+
+
+
+
+
+
diff --git a/persistence/pom.xml b/persistence/pom.xml
index 16dd7bdaa..758ba6699 100644
--- a/persistence/pom.xml
+++ b/persistence/pom.xml
@@ -78,7 +78,7 @@
com.arangodb
arangodb-java-driver
- 6.16.0
+ 6.16.1
com.arangodb
diff --git a/persistence/src/main/scala/com/arangodb/internal/InternalArangoDatabaseOps.scala b/persistence/src/main/scala/com/arangodb/internal/InternalArangoDatabaseOps.scala
index c4f468dff..d3f03ee4a 100644
--- a/persistence/src/main/scala/com/arangodb/internal/InternalArangoDatabaseOps.scala
+++ b/persistence/src/main/scala/com/arangodb/internal/InternalArangoDatabaseOps.scala
@@ -17,31 +17,51 @@
package com.arangodb.internal
import com.arangodb.async.ArangoDatabaseAsync
-import com.arangodb.async.internal.{ArangoExecutorAsync, ArangoExecutorAsyncDestructor}
-import com.arangodb.internal.velocystream.VstCommunicationDestructor
-import com.arangodb.internal.velocystream.VstCommunicationDestructor.ConnectionParams
+import com.arangodb.async.internal.ArangoExecutorAsync
+import com.arangodb.internal.net.{AccessType, HostDescription}
+import com.arangodb.internal.velocystream.VstCommunication
+import com.arangodb.internal.velocystream.internal.VstConnection
+import org.apache.commons.lang3.StringUtils
import org.apache.http.auth.UsernamePasswordCredentials
+import za.co.absa.commons.reflect.ReflectionUtils
+import za.co.absa.commons.reflect.ReflectionUtils.extractFieldValue
import za.co.absa.spline.common.rest.{HttpStatusException, RESTClient, RESTClientApacheHttpImpl}
import java.net.URI
+import javax.net.ssl.SSLContext
import scala.concurrent.{ExecutionContext, Future}
class InternalArangoDatabaseOps(db: ArangoDatabaseAsync)(implicit ec: ExecutionContext) {
+ import com.arangodb.internal.velocystream.VstImplicits._
+
/**
* @see [[https://github.com/arangodb/arangodb-java-driver/issues/353]]
*/
def restClient: RESTClient = {
- val asyncExecutable = db.asInstanceOf[ArangoExecuteable[ArangoExecutorAsync]]
- val ArangoExecutorAsyncDestructor(vstComm) = asyncExecutable.executor
- val VstCommunicationDestructor(ConnectionParams(hostDescription, user, maybePassword, maybeSslContext)) = vstComm
+ val vstComm = {
+ val asyncExecutable = db.asInstanceOf[ArangoExecuteable[ArangoExecutorAsync]]
+ val executor = asyncExecutable.executor
+ ReflectionUtils.extractFieldValue[ArangoExecutorAsync, VstCommunication[_, VstConnection[_]]](executor, "communication")
+ }
+ val connection = vstComm.connect(AccessType.WRITE)
+
+ val maybeSslContext = Option(extractFieldValue[VstConnection[_], SSLContext](connection, "sslContext"))
val scheme = if (maybeSslContext.isDefined) "https" else "http"
+
+ val hostDescription = extractFieldValue[VstConnection[_], HostDescription](connection, "host")
val host = hostDescription.getHost
val port = hostDescription.getPort
- val password = maybePassword.getOrElse("")
- val maybeCredentials = Option(user).map(user => new UsernamePasswordCredentials(user, password))
+ val database = db.dbName
+
+ val maybeCredentials =
+ for {
+ username <- Option(vstComm.getUser)
+ password = StringUtils.defaultString(vstComm.getPassword)
+ } yield
+ new UsernamePasswordCredentials(username, password)
- new RESTClientApacheHttpImpl(new URI(s"$scheme://$host:$port/_db/${db.name}"), maybeCredentials, maybeSslContext)
+ new RESTClientApacheHttpImpl(new URI(s"$scheme://$host:$port/_db/$database"), maybeCredentials, maybeSslContext)
}
diff --git a/persistence/src/main/scala/com/arangodb/internal/velocystream/VstCommunicationDestructor.scala b/persistence/src/main/scala/com/arangodb/internal/velocystream/VstCommunicationDestructor.scala
deleted file mode 100644
index 6e8537624..000000000
--- a/persistence/src/main/scala/com/arangodb/internal/velocystream/VstCommunicationDestructor.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright 2020 ABSA Group Limited
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.arangodb.internal.velocystream
-
-import com.arangodb.internal.net.{AccessType, HostDescription, HostHandler, HostImpl}
-import com.arangodb.internal.velocystream.internal.VstConnection
-import za.co.absa.commons.reflect.ReflectionUtils.extractFieldValue
-
-import javax.net.ssl.SSLContext
-
-object VstCommunicationDestructor {
-
- private object Fields {
- final val HostHandler = "hostHandler"
- final val SslContext = "sslContext"
- }
-
- case class ConnectionParams(
- hostDescription: HostDescription,
- user: String,
- password: Option[String],
- sslContext: Option[SSLContext]
- )
-
- def unapply(comm: VstCommunication[_, _]): Option[ConnectionParams] = {
- val hostHandler = extractFieldValue[VstCommunication[_, _], HostHandler](comm, Fields.HostHandler)
- val host = hostHandler.get(null, AccessType.WRITE).asInstanceOf[HostImpl]
- val sslContext = extractFieldValue[VstConnection[_], SSLContext](host.connection, Fields.SslContext)
-
- val connParams = ConnectionParams(
- host.getDescription,
- comm.user,
- Option(comm.password),
- Option(sslContext)
- )
-
- Some(connParams)
- }
-}
diff --git a/persistence/src/main/scala/com/arangodb/async/internal/ArangoExecutorAsyncDestructor.scala b/persistence/src/main/scala/com/arangodb/internal/velocystream/VstImplicits.scala
similarity index 54%
rename from persistence/src/main/scala/com/arangodb/async/internal/ArangoExecutorAsyncDestructor.scala
rename to persistence/src/main/scala/com/arangodb/internal/velocystream/VstImplicits.scala
index d0a2a0d6f..be884dde0 100644
--- a/persistence/src/main/scala/com/arangodb/async/internal/ArangoExecutorAsyncDestructor.scala
+++ b/persistence/src/main/scala/com/arangodb/internal/velocystream/VstImplicits.scala
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 ABSA Group Limited
+ * Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -14,18 +14,19 @@
* limitations under the License.
*/
-package com.arangodb.async.internal
+package com.arangodb.internal.velocystream
-import com.arangodb.internal.velocystream.VstCommunication
+import com.arangodb.internal.net.AccessType
import com.arangodb.internal.velocystream.internal.VstConnection
-import za.co.absa.commons.reflect.ReflectionUtils
-object ArangoExecutorAsyncDestructor {
- private final val CommunicationField = "communication"
+object VstImplicits {
- def unapply(executor: ArangoExecutorAsync): Option[VstCommunication[_, _ <: VstConnection[_]]] = {
- Option(
- ReflectionUtils.extractFieldValue[ArangoExecutorAsync, VstCommunication[_, _ <: VstConnection[_]]](executor, CommunicationField)
- )
+ implicit class InternalVstCommunicationOps(val vstComm: VstCommunication[_, VstConnection[_]]) extends AnyVal {
+
+ def getUser: String = vstComm.user
+
+ def getPassword: String = vstComm.password
+
+ def connect(accessType: AccessType): VstConnection[_] = vstComm.connect(null, accessType)
}
}
diff --git a/persistence/src/main/scala/za/co/absa/spline/persistence/ArangoDatabaseFacade.scala b/persistence/src/main/scala/za/co/absa/spline/persistence/ArangoDatabaseFacade.scala
index a9857b12b..7018398de 100644
--- a/persistence/src/main/scala/za/co/absa/spline/persistence/ArangoDatabaseFacade.scala
+++ b/persistence/src/main/scala/za/co/absa/spline/persistence/ArangoDatabaseFacade.scala
@@ -16,7 +16,9 @@
package za.co.absa.spline.persistence
+import com.arangodb.DbName
import com.arangodb.async.{ArangoDBAsync, ArangoDatabaseAsync}
+import com.arangodb.entity.LoadBalancingStrategy
import com.arangodb.velocypack.module.scala.VPackScalaModule
import org.slf4s.Logging
import org.springframework.beans.factory.DisposableBean
@@ -27,7 +29,8 @@ import za.co.absa.commons.version.impl.SemVer20Impl.SemanticVersion
import javax.net.ssl._
import scala.concurrent._
-class ArangoDatabaseFacade(connectionURL: ArangoConnectionURL, maybeSSLContext: Option[SSLContext]) extends DisposableBean {
+class ArangoDatabaseFacade(connectionURL: ArangoConnectionURL, maybeSSLContext: Option[SSLContext], activeFailover: Boolean)
+ extends DisposableBean {
import za.co.absa.spline.persistence.ArangoDatabaseFacade._
@@ -48,8 +51,16 @@ class ArangoDatabaseFacade(connectionURL: ArangoConnectionURL, maybeSSLContext:
.having(maybeSSLContext)(_ sslContext _)
}
- // enable active failover
- arangoBuilder.acquireHostList(true)
+ if (activeFailover) {
+ arangoBuilder.acquireHostList(true)
+ arangoBuilder.loadBalancingStrategy(LoadBalancingStrategy.NONE)
+ } else {
+ arangoBuilder.acquireHostList(false)
+ if (hostsWithPorts.size > 1) {
+ arangoBuilder.loadBalancingStrategy(LoadBalancingStrategy.ROUND_ROBIN)
+ }
+ }
+
for ((host, port) <- hostsWithPorts) arangoBuilder.host(host, port)
// build ArangoDB Client
@@ -59,7 +70,7 @@ class ArangoDatabaseFacade(connectionURL: ArangoConnectionURL, maybeSSLContext:
// The val is lazy to not prevent a facade instance from being created.
// It allows connection to be re-attempted later and the {{shutdown()}} method to be called.
lazy val db: ArangoDatabaseAsync = {
- val db = arango.db(dbName)
+ val db = arango.db(DbName.of(dbName))
warmUpDb(db)
db
}
diff --git a/persistence/src/main/scala/za/co/absa/spline/persistence/ArangoManager.scala b/persistence/src/main/scala/za/co/absa/spline/persistence/ArangoManager.scala
index 72b71a29e..5d56e5ee5 100644
--- a/persistence/src/main/scala/za/co/absa/spline/persistence/ArangoManager.scala
+++ b/persistence/src/main/scala/za/co/absa/spline/persistence/ArangoManager.scala
@@ -129,9 +129,9 @@ class ArangoManagerImpl(
for {
exists <- db.exists.toScala
_ <- if (exists && !dropIfExists)
- throw new IllegalArgumentException(s"Arango Database ${db.name()} already exists")
+ throw new IllegalArgumentException(s"Arango Database ${db.dbName} already exists")
else if (exists && dropIfExists) {
- log.info(s"Drop database: ${db.name}")
+ log.info(s"Drop database: ${db.dbName}")
db.drop().toScala
}
else Future.successful({})
@@ -260,7 +260,7 @@ class ArangoManagerImpl(
log.debug(s"Delete search analyzers")
for {
analyzers <- db.getSearchAnalyzers.toScala.map(_.asScala)
- userAnalyzers = analyzers.filter(_.getName.startsWith(s"${db.name}::"))
+ userAnalyzers = analyzers.filter(_.getName.startsWith(s"${db.dbName}::"))
_ <- Future.traverse(userAnalyzers)(ua => {
log.info(s"Delete search analyzer: ${ua.getName}")
db.deleteSearchAnalyzer(ua.getName).toScala
diff --git a/persistence/src/main/scala/za/co/absa/spline/persistence/ArangoManagerFactory.scala b/persistence/src/main/scala/za/co/absa/spline/persistence/ArangoManagerFactory.scala
index ecafc4589..63f840cd6 100644
--- a/persistence/src/main/scala/za/co/absa/spline/persistence/ArangoManagerFactory.scala
+++ b/persistence/src/main/scala/za/co/absa/spline/persistence/ArangoManagerFactory.scala
@@ -27,7 +27,7 @@ trait ArangoManagerFactory {
def create(connectionURL: ArangoConnectionURL, maybeSSLContext: Option[SSLContext]): ArangoManager
}
-class ArangoManagerFactoryImpl()(implicit ec: ExecutionContext) extends ArangoManagerFactory {
+class ArangoManagerFactoryImpl(activeFailover: Boolean)(implicit ec: ExecutionContext) extends ArangoManagerFactory {
import ArangoImplicits._
@@ -48,7 +48,7 @@ class ArangoManagerFactoryImpl()(implicit ec: ExecutionContext) extends ArangoMa
}
def dbFacade(): ArangoDatabaseFacade =
- new ArangoDatabaseFacade(connectionURL, maybeSSLContext)
+ new ArangoDatabaseFacade(connectionURL, maybeSSLContext, activeFailover)
new AutoClosingArangoManagerProxy(dbManager, dbFacade)
}
diff --git a/persistence/src/main/scala/za/co/absa/spline/persistence/ArangoRepoConfig.scala b/persistence/src/main/scala/za/co/absa/spline/persistence/ArangoRepoConfig.scala
index 8ff00960f..d9090ec3e 100644
--- a/persistence/src/main/scala/za/co/absa/spline/persistence/ArangoRepoConfig.scala
+++ b/persistence/src/main/scala/za/co/absa/spline/persistence/ArangoRepoConfig.scala
@@ -29,15 +29,17 @@ import za.co.absa.spline.common.security.TLSUtils
class ArangoRepoConfig extends InitializingBean with Logging {
import za.co.absa.spline.persistence.ArangoRepoConfig._
+
import scala.concurrent.ExecutionContext.Implicits._
override def afterPropertiesSet(): Unit = {
log.info(s"Spline database URL: ${Database.ConnectionURL.asString}")
+ log.info(s"ArangoDB Active Failover: ${Database.ActiveFailoverMode}")
}
@Bean def arangoDatabaseFacade: ArangoDatabaseFacade = {
val sslCtxOpt = Option.when(Database.DisableSSLValidation)(TLSUtils.TrustingAllSSLContext)
- new ArangoDatabaseFacade(Database.ConnectionURL, sslCtxOpt)
+ new ArangoDatabaseFacade(Database.ConnectionURL, sslCtxOpt, Database.ActiveFailoverMode)
}
@Bean def arangoDatabase: ArangoDatabaseAsync = arangoDatabaseFacade.db
@@ -64,6 +66,9 @@ object ArangoRepoConfig extends DefaultConfigurationStack with ConfTyped {
val DisableSSLValidation: Boolean =
conf.getBoolean(Prop("disableSslValidation"), false)
+
+ val ActiveFailoverMode: Boolean =
+ conf.getBoolean(Prop("activeFailover"), false)
}
}
diff --git a/rest-gateway/src/main/webapp/META-INF/context.xml b/rest-gateway/src/main/webapp/META-INF/context.xml
index a138d516d..448188afb 100644
--- a/rest-gateway/src/main/webapp/META-INF/context.xml
+++ b/rest-gateway/src/main/webapp/META-INF/context.xml
@@ -22,6 +22,14 @@
-->
+
+
+
+
+
+