Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 38 additions & 30 deletions admin/src/main/scala/za/co/absa/spline/admin/AdminCLI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ object AdminCLI extends App {
cmd: Command = null,
logLevel: Level = Level.INFO,
disableSslValidation: Boolean = false,
dryRun: Boolean = false,
)

implicit class OptionParserOps(val p: OptionParser[AdminCLIConfig]) extends AnyVal {
Expand All @@ -53,7 +54,7 @@ object AdminCLI extends App {
p.arg[String]("<db_url>")
.required()
.text(s"ArangoDB connection string in the format: ${ArangoConnectionURL.HumanReadableFormat}")
.action { case (url, c@AdminCLIConfig(cmd: DBCommand, _, _)) => c.copy(cmd.dbUrl = ArangoConnectionURL(url)) }
.action { case (url, c@AdminCLIConfig(cmd: DBCommand, _, _, _)) => c.copy(cmd.dbUrl = ArangoConnectionURL(url)) }
)
}

Expand Down Expand Up @@ -101,6 +102,13 @@ class AdminCLI(dbManagerFactory: ArangoManagerFactory) {
.text(s"Disable validation of self-signed SSL certificates. (Don't use on production).")
.action((_, conf) => conf.copy(disableSslValidation = true))

opt[Unit]("dry-run")
.text("Dry-run commands. No real modification will be made to the database.")
.action((_, conf) => {
println(ansi"%yellow{Dry-run mode activated}")
conf.copy(dryRun = true)
})

this.placeNewLine()

cmd("db-init")
Expand All @@ -109,33 +117,33 @@ class AdminCLI(dbManagerFactory: ArangoManagerFactory) {
.children(
opt[Unit]('f', "force")
.text("Re-create the database if one already exists.")
.action { case (_, c@AdminCLIConfig(cmd: DBInit, _, _)) => c.copy(cmd.copy(force = true)) },
.action { case (_, c@AdminCLIConfig(cmd: DBInit, _, _, _)) => c.copy(cmd.copy(force = true)) },
opt[Unit]('s', "skip")
.text("Skip existing database. Don't throw error, just end.")
.action { case (_, c@AdminCLIConfig(cmd: DBInit, _, _)) => c.copy(cmd.copy(skip = true)) },
.action { case (_, c@AdminCLIConfig(cmd: DBInit, _, _, _)) => c.copy(cmd.copy(skip = true)) },

opt[Map[String, Int]]("shard-num")
.text("Override number of shards per collection. Comma-separated key-value pairs, e.g. 'collectionA=2,collectionB=3'.")
.validate(_.values
.collectFirst { case v if v < 1 => failure(s"Shard number should be positive, but was $v") }
.getOrElse(success))
.action {
case (m, c@AdminCLIConfig(cmd@DBInit(_, _, _, opts), _, _)) =>
case (m, c@AdminCLIConfig(cmd@DBInit(_, _, _, opts), _, _, _)) =>
c.copy(cmd.copy(options = opts.copy(numShards = m.mapKeys(CollectionDef.forName))))
},

opt[Int]("shard-num-default")
.text("Override default number of shards.")
.validate(v => if (v < 1) failure(s"Shard number should be positive, but was $v") else success)
.action {
case (v, c@AdminCLIConfig(cmd@DBInit(_, _, _, opts), _, _)) =>
case (v, c@AdminCLIConfig(cmd@DBInit(_, _, _, opts), _, _, _)) =>
c.copy(cmd.copy(options = opts.copy(numShardsDefault = Some(v))))
},

opt[Map[String, String]]("shard-keys")
.text("Override shard keys per collection. Comma-separated key-value pairs, where value is a key name list separated by '+', e.g. 'collectionA=k1+k2,collectionB=k3'.")
.action {
case (m, c@AdminCLIConfig(cmd@DBInit(_, _, _, opts), _, _)) =>
case (m, c@AdminCLIConfig(cmd@DBInit(_, _, _, opts), _, _, _)) =>
c.copy(cmd.copy(options = opts.copy(shardKeys = m.map({
case (k, s) => CollectionDef.forName(k) -> s.split('+').map(_.trim).toSeq
}))))
Expand All @@ -144,7 +152,7 @@ class AdminCLI(dbManagerFactory: ArangoManagerFactory) {
opt[String]("shard-keys-default")
.text("Override default shard keys. Key names separated by '+' character, e.g. 'key1+key2+key3'.")
.action {
case (v, c@AdminCLIConfig(cmd@DBInit(_, _, _, opts), _, _)) =>
case (v, c@AdminCLIConfig(cmd@DBInit(_, _, _, opts), _, _, _)) =>
c.copy(cmd.copy(options = opts.copy(shardKeysDefault = Some(v.split('+').map(_.trim)))))
},

Expand All @@ -154,22 +162,22 @@ class AdminCLI(dbManagerFactory: ArangoManagerFactory) {
.collectFirst { case v if v < 1 => failure(s"Replication factor should be positive, but was $v") }
.getOrElse(success))
.action {
case (m, c@AdminCLIConfig(cmd@DBInit(_, _, _, opts), _, _)) =>
case (m, c@AdminCLIConfig(cmd@DBInit(_, _, _, opts), _, _, _)) =>
c.copy(cmd.copy(options = opts.copy(replFactor = m.mapKeys(CollectionDef.forName))))
},

opt[Int]("repl-factor-default")
.text("Override default replication factor.")
.validate(v => if (v < 1) failure(s"Replication factor should be positive, but was $v") else success)
.action {
case (v, c@AdminCLIConfig(cmd@DBInit(_, _, _, opts), _, _)) =>
case (v, c@AdminCLIConfig(cmd@DBInit(_, _, _, opts), _, _, _)) =>
c.copy(cmd.copy(options = opts.copy(replFactorDefault = Some(v))))
},

opt[Unit]("wait-for-sync")
.text("Ensure the data is synchronized to disk before returning from a document CUD operation.")
.action {
case (_, c@AdminCLIConfig(cmd@DBInit(_, _, _, opts), _, _)) =>
case (_, c@AdminCLIConfig(cmd@DBInit(_, _, _, opts), _, _, _)) =>
c.copy(cmd.copy(options = opts.copy(waitForSync = true)))
}
)
Expand All @@ -190,28 +198,28 @@ class AdminCLI(dbManagerFactory: ArangoManagerFactory) {
.children(
opt[Unit]("check-access")
.text("Check access to the database")
.action { case (_, c@AdminCLIConfig(cmd: DBExec, _, _)) => c.copy(cmd.addAction(CheckDBAccess)) },
.action { case (_, c@AdminCLIConfig(cmd: DBExec, _, _, _)) => c.copy(cmd.addAction(CheckDBAccess)) },
opt[Unit]("foxx-reinstall")
.text("Reinstall Foxx services")
.action { case (_, c@AdminCLIConfig(cmd: DBExec, _, _)) => c.copy(cmd.addAction(FoxxReinstall)) },
.action { case (_, c@AdminCLIConfig(cmd: DBExec, _, _, _)) => c.copy(cmd.addAction(FoxxReinstall)) },
opt[Unit]("indices-delete")
.text("Delete indices")
.action { case (_, c@AdminCLIConfig(cmd: DBExec, _, _)) => c.copy(cmd.addAction(IndicesDelete)) },
.action { case (_, c@AdminCLIConfig(cmd: DBExec, _, _, _)) => c.copy(cmd.addAction(IndicesDelete)) },
opt[Unit]("indices-create")
.text("Create indices")
.action { case (_, c@AdminCLIConfig(cmd: DBExec, _, _)) => c.copy(cmd.addAction(IndicesCreate)) },
.action { case (_, c@AdminCLIConfig(cmd: DBExec, _, _, _)) => c.copy(cmd.addAction(IndicesCreate)) },
opt[Unit]("search-views-delete")
.text("Delete search views")
.action { case (_, c@AdminCLIConfig(cmd: DBExec, _, _)) => c.copy(cmd.addAction(SearchViewsDelete)) },
.action { case (_, c@AdminCLIConfig(cmd: DBExec, _, _, _)) => c.copy(cmd.addAction(SearchViewsDelete)) },
opt[Unit]("search-views-create")
.text("Create search views")
.action { case (_, c@AdminCLIConfig(cmd: DBExec, _, _)) => c.copy(cmd.addAction(SearchViewsCreate)) },
.action { case (_, c@AdminCLIConfig(cmd: DBExec, _, _, _)) => c.copy(cmd.addAction(SearchViewsCreate)) },
opt[Unit]("search-analyzers-delete")
.text("Delete search analyzers")
.action { case (_, c@AdminCLIConfig(cmd: DBExec, _, _)) => c.copy(cmd.addAction(SearchAnalyzerDelete)) },
.action { case (_, c@AdminCLIConfig(cmd: DBExec, _, _, _)) => c.copy(cmd.addAction(SearchAnalyzerDelete)) },
opt[Unit]("search-analyzers-create")
.text("Create search analyzers")
.action { case (_, c@AdminCLIConfig(cmd: DBExec, _, _)) => c.copy(cmd.addAction(SearchAnalyzerCreate)) }
.action { case (_, c@AdminCLIConfig(cmd: DBExec, _, _, _)) => c.copy(cmd.addAction(SearchAnalyzerCreate)) }
)
.children(this.dbCommandOptions: _*)

Expand All @@ -223,23 +231,23 @@ class AdminCLI(dbManagerFactory: ArangoManagerFactory) {
opt[String]("retain-for")
text "Retention period in format <length><unit>. " +
"Example: `--retain-for 30d` means to retain data that is NOT older than 30 days from now."
action { case (s, c@AdminCLIConfig(cmd: DBPrune, _, _)) => c.copy(cmd.copy(retentionPeriod = Some(Duration(s)))) },
action { case (s, c@AdminCLIConfig(cmd: DBPrune, _, _, _)) => c.copy(cmd.copy(retentionPeriod = Some(Duration(s)))) },
opt[String]("before-date")
text "A datetime with an optional time and zone parts in ISO-8601 format. " +
"The data older than the specified datetime is subject for removal."
action { case (s, c@AdminCLIConfig(cmd: DBPrune, _, _)) => c.copy(cmd.copy(thresholdDate = Some(parseZonedDateTime(s)))) },
action { case (s, c@AdminCLIConfig(cmd: DBPrune, _, _, _)) => c.copy(cmd.copy(thresholdDate = Some(parseZonedDateTime(s)))) },
))

checkConfig {
case AdminCLIConfig(null, _, _) =>
case AdminCLIConfig(null, _, _, _) =>
failure("No command given")
case AdminCLIConfig(cmd: DBCommand, _, _) if cmd.dbUrl == null =>
case AdminCLIConfig(cmd: DBCommand, _, _, _) if cmd.dbUrl == null =>
failure("DB connection string is required")
case AdminCLIConfig(cmd: DBInit, _, _) if cmd.force && cmd.skip =>
case AdminCLIConfig(cmd: DBInit, _, _, _) if cmd.force && cmd.skip =>
failure("Options '--force' and '--skip' cannot be used together")
case AdminCLIConfig(cmd: DBPrune, _, _) if cmd.retentionPeriod.isEmpty && cmd.thresholdDate.isEmpty =>
case AdminCLIConfig(cmd: DBPrune, _, _, _) if cmd.retentionPeriod.isEmpty && cmd.thresholdDate.isEmpty =>
failure("One of the following options must be specified: --retain-for or --before-date")
case AdminCLIConfig(cmd: DBPrune, _, _) if cmd.retentionPeriod.isDefined && cmd.thresholdDate.isDefined =>
case AdminCLIConfig(cmd: DBPrune, _, _, _) if cmd.retentionPeriod.isDefined && cmd.thresholdDate.isDefined =>
failure("Options --retain-for and --before-date cannot be used together")
case _ =>
success
Expand All @@ -264,24 +272,24 @@ class AdminCLI(dbManagerFactory: ArangoManagerFactory) {
case (false, true) => Skip
case _ => Fail
}
val dbManager = dbManagerFactory.create(url, sslCtxOpt)
val dbManager = dbManagerFactory.create(url, sslCtxOpt, conf.dryRun)
val wasInitialized = Await.result(dbManager.initialize(onExistsAction, options), Duration.Inf)
if (!wasInitialized) println(ansi"%yellow{Skipped. DB is already initialized}")

case DBUpgrade(url) =>
val dbManager = dbManagerFactory.create(url, sslCtxOpt)
val dbManager = dbManagerFactory.create(url, sslCtxOpt, conf.dryRun)
Await.result(dbManager.upgrade(), Duration.Inf)

case DBExec(url, actions) =>
val dbManager = dbManagerFactory.create(url, sslCtxOpt)
val dbManager = dbManagerFactory.create(url, sslCtxOpt, conf.dryRun)
Await.result(dbManager.execute(actions: _*), Duration.Inf)

case DBPrune(url, Some(retentionPeriod), _) =>
val dbManager = dbManagerFactory.create(url, sslCtxOpt)
val dbManager = dbManagerFactory.create(url, sslCtxOpt, conf.dryRun)
Await.result(dbManager.prune(retentionPeriod), Duration.Inf)

case DBPrune(url, _, Some(dateTime)) =>
val dbManager = dbManagerFactory.create(url, sslCtxOpt)
val dbManager = dbManagerFactory.create(url, sslCtxOpt, conf.dryRun)
Await.result(dbManager.prune(dateTime), Duration.Inf)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ import scala.concurrent.duration.Duration

class InteractiveArangoManagerFactoryProxy(dbManagerFactory: ArangoManagerFactory, interactor: UserInteractor) extends ArangoManagerFactory {

override def create(connUrl: ArangoConnectionURL, maybeSSLContext: Option[SSLContext]): ArangoManager = {
override def create(connUrl: ArangoConnectionURL, maybeSSLContext: Option[SSLContext], dryRun: Boolean): ArangoManager = {
try {
val dbm = dbManagerFactory.create(connUrl, maybeSSLContext)
val dbm = dbManagerFactory.create(connUrl, maybeSSLContext, dryRun)
Await.result(dbm.execute(CheckDBAccess), Duration.Inf)
dbm
} catch {
case ArangoDBAuthenticationException(_) if connUrl.user.isEmpty || connUrl.password.isEmpty =>
val updatedConnUrl = interactor.credentializeConnectionUrl(connUrl)
dbManagerFactory.create(updatedConnUrl, maybeSSLContext)
dbManagerFactory.create(updatedConnUrl, maybeSSLContext, dryRun)
}
}
}
40 changes: 22 additions & 18 deletions admin/src/main/scala/za/co/absa/spline/arango/ArangoManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import za.co.absa.commons.reflect.EnumerationMacros.sealedInstancesOf
import za.co.absa.commons.version.impl.SemVer20Impl.SemanticVersion
import za.co.absa.spline.arango.OnDBExistsAction.{Drop, Skip}
import za.co.absa.spline.arango.foxx.{FoxxManager, FoxxSourceResolver}
import za.co.absa.spline.persistence.DatabaseVersionManager
import za.co.absa.spline.persistence.migration.Migrator
import za.co.absa.spline.persistence.model.{CollectionDef, GraphDef, SearchAnalyzerDef, SearchViewDef}
import za.co.absa.spline.persistence.{DatabaseVersionManager, DryRunnable}

import java.time.{Clock, ZonedDateTime}
import scala.collection.JavaConverters._
Expand Down Expand Up @@ -56,9 +56,11 @@ class ArangoManagerImpl(
migrator: Migrator,
foxxManager: FoxxManager,
clock: Clock,
appDBVersion: SemanticVersion)
appDBVersion: SemanticVersion,
val dryRun: Boolean)
(implicit val ex: ExecutionContext)
extends ArangoManager
with DryRunnable
with Logging {

import ArangoManagerImpl._
Expand Down Expand Up @@ -145,15 +147,15 @@ class ArangoManagerImpl(
throw new IllegalArgumentException(s"Arango Database ${db.dbName} already exists")
else if (exists && dropIfExists) {
log.info(s"Drop database: ${db.dbName}")
db.drop().toScala
unlessDryRunAsync(db.drop().toScala)
}
else Future.successful({})
} yield {}
}

private def createDb() = {
log.info(s"Create database: ${db.dbName}")
db.create().toScala
unlessDryRunAsync(db.create().toScala)
}

private def createCollections(options: DatabaseCreateOptions) = {
Expand All @@ -171,8 +173,8 @@ class ArangoManagerImpl(
.replicationFactor(replFactor)
.waitForSync(options.waitForSync)
for {
_ <- db.createCollection(colDef.name, collectionOptions).toScala
_ <- db.collection(colDef.name).insertDocuments(colDef.initData.asJava).toScala
_ <- unlessDryRunAsync(db.createCollection(colDef.name, collectionOptions).toScala)
_ <- unlessDryRunAsync(db.collection(colDef.name).insertDocuments(colDef.initData.asJava).toScala)
} yield ()
})
}
Expand All @@ -186,7 +188,7 @@ class ArangoManagerImpl(
.collection(e.name)
.from(e.froms.map(_.name): _*)
.to(e.tos.map(_.name): _*))
db.createGraph(graphDef.name, edgeDefs.asJava).toScala
unlessDryRunAsync(db.createGraph(graphDef.name, edgeDefs.asJava).toScala)
})
}

Expand All @@ -199,7 +201,7 @@ class ArangoManagerImpl(
userIndices = allIndices.filter { case (_, idx) => idx.getType != IndexType.primary && idx.getType != IndexType.edge }
_ <- Future.traverse(userIndices) { case (colName, idx) =>
log.debug(s"Drop ${idx.getType} index: $colName.${idx.getName}")
db.deleteIndex(idx.getId).toScala
unlessDryRunAsync(db.deleteIndex(idx.getId).toScala)
}
} yield {}
}
Expand All @@ -215,12 +217,14 @@ class ArangoManagerImpl(
log.debug(s"Ensure ${idxOpts.indexType} index: ${colDef.name} [${idxDef.fields.mkString(",")}]")
val dbCol = db.collection(colDef.name)
val fields = idxDef.fields.asJava
(idxOpts match {
case opts: FulltextIndexOptions => dbCol.ensureFulltextIndex(fields, opts)
case opts: GeoIndexOptions => dbCol.ensureGeoIndex(fields, opts)
case opts: PersistentIndexOptions => dbCol.ensurePersistentIndex(fields, opts)
case opts: TtlIndexOptions => dbCol.ensureTtlIndex(fields, opts)
}).toScala
unlessDryRunAsync {
(idxOpts match {
case opts: FulltextIndexOptions => dbCol.ensureFulltextIndex(fields, opts)
case opts: GeoIndexOptions => dbCol.ensureGeoIndex(fields, opts)
case opts: PersistentIndexOptions => dbCol.ensurePersistentIndex(fields, opts)
case opts: TtlIndexOptions => dbCol.ensureTtlIndex(fields, opts)
}).toScala
}
})
}

Expand Down Expand Up @@ -258,7 +262,7 @@ class ArangoManagerImpl(
views = viewEntities.map(ve => db.view(ve.getName))
_ <- Future.traverse(views) { view =>
log.info(s"Delete search view: ${view.name}")
view.drop().toScala
unlessDryRunAsync(view.drop().toScala)
}
} yield {}
}
Expand All @@ -267,7 +271,7 @@ class ArangoManagerImpl(
log.debug(s"Create search views")
Future.traverse(sealedInstancesOf[SearchViewDef]) { viewDef =>
log.info(s"Create search view: ${viewDef.name}")
db.createArangoSearch(viewDef.name, viewDef.properties).toScala
unlessDryRunAsync(db.createArangoSearch(viewDef.name, viewDef.properties).toScala)
}
}

Expand All @@ -278,7 +282,7 @@ class ArangoManagerImpl(
userAnalyzers = analyzers.filter(_.getName.startsWith(s"${db.dbName}::"))
_ <- Future.traverse(userAnalyzers)(ua => {
log.info(s"Delete search analyzer: ${ua.getName}")
db.deleteSearchAnalyzer(ua.getName).toScala
unlessDryRunAsync(db.deleteSearchAnalyzer(ua.getName).toScala)
})
} yield {}
}
Expand All @@ -287,7 +291,7 @@ class ArangoManagerImpl(
log.debug(s"Create search analyzers")
Future.traverse(sealedInstancesOf[SearchAnalyzerDef]) { ad =>
log.info(s"Create search analyzer: ${ad.name}")
db.createSearchAnalyzer(ad.analyzer).toScala
unlessDryRunAsync(db.createSearchAnalyzer(ad.analyzer).toScala)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,19 @@ import javax.net.ssl.SSLContext
import scala.concurrent.ExecutionContext

trait ArangoManagerFactory {
def create(connectionURL: ArangoConnectionURL, maybeSSLContext: Option[SSLContext]): ArangoManager
def create(connectionURL: ArangoConnectionURL, maybeSSLContext: Option[SSLContext], dryRun: Boolean): ArangoManager
}

class ArangoManagerFactoryImpl(activeFailover: Boolean)(implicit ec: ExecutionContext) extends ArangoManagerFactory {

override def create(connectionURL: ArangoConnectionURL, maybeSSLContext: Option[SSLContext]): ArangoManager = {
override def create(connectionURL: ArangoConnectionURL, maybeSSLContext: Option[SSLContext], dryRun: Boolean): ArangoManager = {
val scriptRepo = MigrationScriptRepository

def dbManager(db: ArangoDatabaseAsync): ArangoManager = {
val versionManager = new DatabaseVersionManager(db)
val drManager = new DataRetentionManager(db)
val migrator = new Migrator(db, scriptRepo, versionManager)
val foxxManager = new FoxxManagerImpl(db.restClient)
val versionManager = new DatabaseVersionManager(db, dryRun)
val drManager = new DataRetentionManager(db, dryRun)
val migrator = new Migrator(db, scriptRepo, versionManager, dryRun)
val foxxManager = new FoxxManagerImpl(db.restClient, dryRun)
val clock = Clock.systemDefaultZone
new ArangoManagerImpl(
db,
Expand All @@ -48,7 +48,8 @@ class ArangoManagerFactoryImpl(activeFailover: Boolean)(implicit ec: ExecutionCo
migrator,
foxxManager,
clock,
scriptRepo.latestToVersion
scriptRepo.latestToVersion,
dryRun
)
}

Expand Down
Loading