Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ object AdminCLI extends App {
)
}

private val dbManagerFactoryImpl = new ArangoManagerFactoryImpl()
private val dbManagerFactoryImpl = new ArangoManagerFactoryImpl(activeFailover = false)
private val maybeConsole = InputConsole.systemConsoleIfAvailable()

val dbManagerFactory = maybeConsole
Expand Down
11 changes: 10 additions & 1 deletion kafka-gateway/src/main/webapp/META-INF/context.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@
<Environment name="spline/kafka/consumer/bootstrap/servers" type="java.lang.String" override="false"/>

<!--
Optional config
Optional config [kafka]
-->

<Environment name="spline/kafka/consumerConcurrency" type="java.lang.String" override="false"/>

<Environment name="spline/kafka/insertPlanTimeout" type="java.lang.String" override="false"/>
Expand All @@ -45,4 +46,12 @@

<Environment name="spline/kafka/deadLetterQueueEnabled" type="java.lang.String" override="false"/>

<!--
Optional config [database]
-->

<Environment name="spline/database/activeFailover" type="java.lang.String" override="false"/>
<Environment name="spline/database/disableSslValidation" type="java.lang.String" override="false"/>
<Environment name="spline/database/logFullQueryOnError" type="java.lang.String" override="false"/>

</Context>
2 changes: 1 addition & 1 deletion persistence/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
<dependency>
<groupId>com.arangodb</groupId>
<artifactId>arangodb-java-driver</artifactId>
<version>6.16.0</version>
<version>6.16.1</version>
</dependency>
<dependency>
<groupId>com.arangodb</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,51 @@
package com.arangodb.internal

import com.arangodb.async.ArangoDatabaseAsync
import com.arangodb.async.internal.{ArangoExecutorAsync, ArangoExecutorAsyncDestructor}
import com.arangodb.internal.velocystream.VstCommunicationDestructor
import com.arangodb.internal.velocystream.VstCommunicationDestructor.ConnectionParams
import com.arangodb.async.internal.ArangoExecutorAsync
import com.arangodb.internal.net.{AccessType, HostDescription}
import com.arangodb.internal.velocystream.VstCommunication
import com.arangodb.internal.velocystream.internal.VstConnection
import org.apache.commons.lang3.StringUtils
import org.apache.http.auth.UsernamePasswordCredentials
import za.co.absa.commons.reflect.ReflectionUtils
import za.co.absa.commons.reflect.ReflectionUtils.extractFieldValue
import za.co.absa.spline.common.rest.{HttpStatusException, RESTClient, RESTClientApacheHttpImpl}

import java.net.URI
import javax.net.ssl.SSLContext
import scala.concurrent.{ExecutionContext, Future}

class InternalArangoDatabaseOps(db: ArangoDatabaseAsync)(implicit ec: ExecutionContext) {

import com.arangodb.internal.velocystream.VstImplicits._

/**
* @see [[https://github.com/arangodb/arangodb-java-driver/issues/353]]
*/
def restClient: RESTClient = {
val asyncExecutable = db.asInstanceOf[ArangoExecuteable[ArangoExecutorAsync]]
val ArangoExecutorAsyncDestructor(vstComm) = asyncExecutable.executor
val VstCommunicationDestructor(ConnectionParams(hostDescription, user, maybePassword, maybeSslContext)) = vstComm
val vstComm = {
val asyncExecutable = db.asInstanceOf[ArangoExecuteable[ArangoExecutorAsync]]
val executor = asyncExecutable.executor
ReflectionUtils.extractFieldValue[ArangoExecutorAsync, VstCommunication[_, VstConnection[_]]](executor, "communication")
}
val connection = vstComm.connect(AccessType.WRITE)

val maybeSslContext = Option(extractFieldValue[VstConnection[_], SSLContext](connection, "sslContext"))
val scheme = if (maybeSslContext.isDefined) "https" else "http"

val hostDescription = extractFieldValue[VstConnection[_], HostDescription](connection, "host")
val host = hostDescription.getHost
val port = hostDescription.getPort
val password = maybePassword.getOrElse("")
val maybeCredentials = Option(user).map(user => new UsernamePasswordCredentials(user, password))
val database = db.dbName

val maybeCredentials =
for {
username <- Option(vstComm.getUser)
password = StringUtils.defaultString(vstComm.getPassword)
} yield
new UsernamePasswordCredentials(username, password)

new RESTClientApacheHttpImpl(new URI(s"$scheme://$host:$port/_db/${db.name}"), maybeCredentials, maybeSslContext)
new RESTClientApacheHttpImpl(new URI(s"$scheme://$host:$port/_db/$database"), maybeCredentials, maybeSslContext)
}


Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 ABSA Group Limited
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -14,18 +14,19 @@
* limitations under the License.
*/

package com.arangodb.async.internal
package com.arangodb.internal.velocystream

import com.arangodb.internal.velocystream.VstCommunication
import com.arangodb.internal.net.AccessType
import com.arangodb.internal.velocystream.internal.VstConnection
import za.co.absa.commons.reflect.ReflectionUtils

object ArangoExecutorAsyncDestructor {
private final val CommunicationField = "communication"
object VstImplicits {

def unapply(executor: ArangoExecutorAsync): Option[VstCommunication[_, _ <: VstConnection[_]]] = {
Option(
ReflectionUtils.extractFieldValue[ArangoExecutorAsync, VstCommunication[_, _ <: VstConnection[_]]](executor, CommunicationField)
)
implicit class InternalVstCommunicationOps(val vstComm: VstCommunication[_, VstConnection[_]]) extends AnyVal {

def getUser: String = vstComm.user

def getPassword: String = vstComm.password

def connect(accessType: AccessType): VstConnection[_] = vstComm.connect(null, accessType)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

package za.co.absa.spline.persistence

import com.arangodb.DbName
import com.arangodb.async.{ArangoDBAsync, ArangoDatabaseAsync}
import com.arangodb.entity.LoadBalancingStrategy
import com.arangodb.velocypack.module.scala.VPackScalaModule
import org.slf4s.Logging
import org.springframework.beans.factory.DisposableBean
Expand All @@ -27,7 +29,8 @@ import za.co.absa.commons.version.impl.SemVer20Impl.SemanticVersion
import javax.net.ssl._
import scala.concurrent._

class ArangoDatabaseFacade(connectionURL: ArangoConnectionURL, maybeSSLContext: Option[SSLContext]) extends DisposableBean {
class ArangoDatabaseFacade(connectionURL: ArangoConnectionURL, maybeSSLContext: Option[SSLContext], activeFailover: Boolean)
extends DisposableBean {

import za.co.absa.spline.persistence.ArangoDatabaseFacade._

Expand All @@ -48,8 +51,16 @@ class ArangoDatabaseFacade(connectionURL: ArangoConnectionURL, maybeSSLContext:
.having(maybeSSLContext)(_ sslContext _)
}

// enable active failover
arangoBuilder.acquireHostList(true)
if (activeFailover) {
arangoBuilder.acquireHostList(true)
arangoBuilder.loadBalancingStrategy(LoadBalancingStrategy.NONE)
} else {
arangoBuilder.acquireHostList(false)
if (hostsWithPorts.size > 1) {
arangoBuilder.loadBalancingStrategy(LoadBalancingStrategy.ROUND_ROBIN)
}
}

for ((host, port) <- hostsWithPorts) arangoBuilder.host(host, port)

// build ArangoDB Client
Expand All @@ -59,7 +70,7 @@ class ArangoDatabaseFacade(connectionURL: ArangoConnectionURL, maybeSSLContext:
// The val is lazy to not prevent a facade instance from being created.
// It allows connection to be re-attempted later and the {{shutdown()}} method to be called.
lazy val db: ArangoDatabaseAsync = {
val db = arango.db(dbName)
val db = arango.db(DbName.of(dbName))
warmUpDb(db)
db
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ class ArangoManagerImpl(
for {
exists <- db.exists.toScala
_ <- if (exists && !dropIfExists)
throw new IllegalArgumentException(s"Arango Database ${db.name()} already exists")
throw new IllegalArgumentException(s"Arango Database ${db.dbName} already exists")
else if (exists && dropIfExists) {
log.info(s"Drop database: ${db.name}")
log.info(s"Drop database: ${db.dbName}")
db.drop().toScala
}
else Future.successful({})
Expand Down Expand Up @@ -260,7 +260,7 @@ class ArangoManagerImpl(
log.debug(s"Delete search analyzers")
for {
analyzers <- db.getSearchAnalyzers.toScala.map(_.asScala)
userAnalyzers = analyzers.filter(_.getName.startsWith(s"${db.name}::"))
userAnalyzers = analyzers.filter(_.getName.startsWith(s"${db.dbName}::"))
_ <- Future.traverse(userAnalyzers)(ua => {
log.info(s"Delete search analyzer: ${ua.getName}")
db.deleteSearchAnalyzer(ua.getName).toScala
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ trait ArangoManagerFactory {
def create(connectionURL: ArangoConnectionURL, maybeSSLContext: Option[SSLContext]): ArangoManager
}

class ArangoManagerFactoryImpl()(implicit ec: ExecutionContext) extends ArangoManagerFactory {
class ArangoManagerFactoryImpl(activeFailover: Boolean)(implicit ec: ExecutionContext) extends ArangoManagerFactory {

import ArangoImplicits._

Expand All @@ -48,7 +48,7 @@ class ArangoManagerFactoryImpl()(implicit ec: ExecutionContext) extends ArangoMa
}

def dbFacade(): ArangoDatabaseFacade =
new ArangoDatabaseFacade(connectionURL, maybeSSLContext)
new ArangoDatabaseFacade(connectionURL, maybeSSLContext, activeFailover)

new AutoClosingArangoManagerProxy(dbManager, dbFacade)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,17 @@ import za.co.absa.spline.common.security.TLSUtils
class ArangoRepoConfig extends InitializingBean with Logging {

import za.co.absa.spline.persistence.ArangoRepoConfig._

import scala.concurrent.ExecutionContext.Implicits._

override def afterPropertiesSet(): Unit = {
log.info(s"Spline database URL: ${Database.ConnectionURL.asString}")
log.info(s"ArangoDB Active Failover: ${Database.ActiveFailoverMode}")
}

@Bean def arangoDatabaseFacade: ArangoDatabaseFacade = {
val sslCtxOpt = Option.when(Database.DisableSSLValidation)(TLSUtils.TrustingAllSSLContext)
new ArangoDatabaseFacade(Database.ConnectionURL, sslCtxOpt)
new ArangoDatabaseFacade(Database.ConnectionURL, sslCtxOpt, Database.ActiveFailoverMode)
}

@Bean def arangoDatabase: ArangoDatabaseAsync = arangoDatabaseFacade.db
Expand All @@ -64,6 +66,9 @@ object ArangoRepoConfig extends DefaultConfigurationStack with ConfTyped {

val DisableSSLValidation: Boolean =
conf.getBoolean(Prop("disableSslValidation"), false)

val ActiveFailoverMode: Boolean =
conf.getBoolean(Prop("activeFailover"), false)
}

}
8 changes: 8 additions & 0 deletions rest-gateway/src/main/webapp/META-INF/context.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@
-->
<Environment name="spline/database/connectionUrl" type="java.lang.String" override="false"/>

<!--
Optional config
-->

<Environment name="spline/database/activeFailover" type="java.lang.String" override="false"/>
<Environment name="spline/database/disableSslValidation" type="java.lang.String" override="false"/>
<Environment name="spline/database/logFullQueryOnError" type="java.lang.String" override="false"/>

<Environment name="spline/producer/timeoutDefault" type="java.lang.String" override="false"/>
<Environment name="spline/producer/timeoutMaximum" type="java.lang.String" override="false"/>

Expand Down