Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions admin/src/main/scala/za/co/absa/spline/admin/AdminCLI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.slf4j.Logger.ROOT_LOGGER_NAME
import org.slf4j.LoggerFactory
import scopt.{OptionDef, OptionParser}
import za.co.absa.spline.admin.AdminCLI.AdminCLIConfig
import za.co.absa.spline.admin.DateTimeUtils.parseZonedDateTime
import za.co.absa.spline.arango.AuxiliaryDBAction._
import za.co.absa.spline.arango.OnDBExistsAction.{Drop, Fail, Skip}
import za.co.absa.spline.arango.{ArangoManagerFactory, ArangoManagerFactoryImpl}
Expand Down Expand Up @@ -214,13 +215,32 @@ class AdminCLI(dbManagerFactory: ArangoManagerFactory) {
)
.children(this.dbCommandOptions: _*)

(cmd("db-prune")
action ((_, c) => c.copy(cmd = DBPrune()))
text "Prune old data to decrease the database footprint and speed up queries."
children (this.dbCommandOptions: _*)
children(
opt[String]("retain-for")
text "Retention period in format <length><unit>. " +
"Example: `--retain-for 30d` means to retain data that is NOT older than 30 days from now."
action { case (s, c@AdminCLIConfig(cmd: DBPrune, _, _)) => c.copy(cmd.copy(retentionPeriod = Some(Duration(s)))) },
opt[String]("before-date")
text "A datetime with an optional time and zone parts in ISO-8601 format. " +
"The data older than the specified datetime is subject for removal."
action { case (s, c@AdminCLIConfig(cmd: DBPrune, _, _)) => c.copy(cmd.copy(thresholdDate = Some(parseZonedDateTime(s)))) },
))

checkConfig {
case AdminCLIConfig(null, _, _) =>
failure("No command given")
case AdminCLIConfig(cmd: DBCommand, _, _) if cmd.dbUrl == null =>
failure("DB connection string is required")
case AdminCLIConfig(cmd: DBInit, _, _) if cmd.force && cmd.skip =>
failure("Options '--force' and '--skip' cannot be used together")
case AdminCLIConfig(cmd: DBPrune, _, _) if cmd.retentionPeriod.isEmpty && cmd.thresholdDate.isEmpty =>
failure("One of the following options must be specified: --retain-for or --before-date")
case AdminCLIConfig(cmd: DBPrune, _, _) if cmd.retentionPeriod.isDefined && cmd.thresholdDate.isDefined =>
failure("Options --retain-for and --before-date cannot be used together")
case _ =>
success
}
Expand Down Expand Up @@ -255,6 +275,14 @@ class AdminCLI(dbManagerFactory: ArangoManagerFactory) {
case DBExec(url, actions) =>
val dbManager = dbManagerFactory.create(url, sslCtxOpt)
Await.result(dbManager.execute(actions: _*), Duration.Inf)

case DBPrune(url, Some(retentionPeriod), _) =>
val dbManager = dbManagerFactory.create(url, sslCtxOpt)
Await.result(dbManager.prune(retentionPeriod), Duration.Inf)

case DBPrune(url, _, Some(dateTime)) =>
val dbManager = dbManagerFactory.create(url, sslCtxOpt)
Await.result(dbManager.prune(dateTime), Duration.Inf)
}

println(ansi"%green{DONE}")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.spline.admin

import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder}
import java.time.temporal.ChronoField
import java.time.{ZoneId, ZonedDateTime}
import scala.util.{Failure, Success, Try}
import scala.util.control.NonFatal

object DateTimeUtils {

private val ZonedDateTimeRegexp = (s"" +
"^" +
"""([\dT:.+\-]+?)""".r + // local datetime
"""(Z|[+\-]\d\d:\d\d)?""".r + // timezone offset
"""(?:\[([\w/+\-]+)])?""".r + // timezone name
"$").r

private val ZonedDateTimeFormatter = new DateTimeFormatterBuilder()
.parseCaseInsensitive()
.append(DateTimeFormatter.ISO_LOCAL_DATE)
.optionalStart()
.appendLiteral('T')
.append(DateTimeFormatter.ISO_LOCAL_TIME)
.optionalEnd()
.parseDefaulting(ChronoField.HOUR_OF_DAY, 0)
.toFormatter();

def parseZonedDateTime(s: String, defaultZoneId: ZoneId = ZoneId.systemDefault): ZonedDateTime =
Try {
val ZonedDateTimeRegexp(ldt, tzOffset, tzId) = s
val maybeTzIds = Seq(tzId, tzOffset).map(Option.apply)

require(!maybeTzIds.forall(_.isDefined), "Either timezone ID or offset should be specified, not both")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ZonedDateTime.parse allows both offset and name at the same time . I think it's good idea to accept all inputs that are valid for ZonedDateTime.parse

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue with allowing only geographical zone id is that it is ambiguous. Not unique.

2022-10-30T02:30:00[Europe/Prague] is both:
2022-10-30T02:30+02:00[Europe/Prague]
2022-10-30T02:30+01:00[Europe/Prague]

This is the day and hour of change from summer time to winter time:

val start = ZonedDateTime.parse("2022-10-30T02:30:00+02:00[Europe/Prague]")
val end = start.plusHours(1)
// start: java.time.ZonedDateTime = 2022-10-30T02:30+02:00[Europe/Prague]
// end: java.time.ZonedDateTime =   2022-10-30T02:30+01:00[Europe/Prague]

Another interesting behaviour (not so relevant for us though) is this:

val start = ZonedDateTime.parse("2022-10-30T02:30:00+02:00")
val end = start.plusHours(1)
// start: java.time.ZonedDateTime = 2022-10-30T02:30+02:00
// end: java.time.ZonedDateTime   = 2022-10-30T03:30+02:00

When no geographical zone id is provided the ZonedDateTime will stay in the same offset.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We agreed to solve this as part of other ticket #1139


val tz = maybeTzIds
.collectFirst({ case Some(v) => ZoneId.of(v) })
.getOrElse(defaultZoneId)

ZonedDateTime.parse(ldt, ZonedDateTimeFormatter.withZone(tz))
Comment thread
cerveada marked this conversation as resolved.
} match {
case Success(zonedTime) => zonedTime
case Failure(nfe@NonFatal(_)) => throw new IllegalArgumentException(s"Cannot parse zoned datetime: $s", nfe)
}
}
12 changes: 12 additions & 0 deletions admin/src/main/scala/za/co/absa/spline/admin/commands.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import za.co.absa.spline.admin.DBCommand._
import za.co.absa.spline.arango.{AuxiliaryDBAction, DatabaseCreateOptions}
import za.co.absa.spline.persistence.ArangoConnectionURL

import java.time.ZonedDateTime
import scala.concurrent.duration.Duration

sealed trait Command

sealed trait DBCommand extends Command {
Expand Down Expand Up @@ -67,3 +70,12 @@ case class DBExec(

def addAction(action: AuxiliaryDBAction): DBExec = copy(actions = actions :+ action)
}

case class DBPrune(
override val dbUrl: Url = null,
retentionPeriod: Option[Duration] = None,
thresholdDate: Option[ZonedDateTime] = None
) extends DBCommand {
protected override type Self = DBPrune
protected override val selfCopy: DBCommandProps => Self = copy(_)
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ import za.co.absa.spline.persistence.DatabaseVersionManager
import za.co.absa.spline.persistence.migration.Migrator
import za.co.absa.spline.persistence.model.{CollectionDef, GraphDef, SearchAnalyzerDef, SearchViewDef}

import java.time.{Clock, ZonedDateTime}
import scala.collection.JavaConverters._
import scala.collection.immutable._
import scala.compat.java8.FutureConverters._
import scala.concurrent.duration.Duration
import scala.concurrent.{ExecutionContext, Future}

trait ArangoManager {
Expand All @@ -42,13 +44,18 @@ trait ArangoManager {
def initialize(onExistsAction: OnDBExistsAction, options: DatabaseCreateOptions): Future[Boolean]
def upgrade(): Future[Unit]
def execute(actions: AuxiliaryDBAction*): Future[Unit]
def prune(retentionPeriod: Duration): Future[Unit]
def prune(thresholdDate: ZonedDateTime): Future[Unit]

}

class ArangoManagerImpl(
db: ArangoDatabaseAsync,
dbVersionManager: DatabaseVersionManager,
dataRetentionManager: DataRetentionManager,
migrator: Migrator,
foxxManager: FoxxManager,
clock: Clock,
appDBVersion: SemanticVersion)
(implicit val ex: ExecutionContext)
extends ArangoManager
Expand Down Expand Up @@ -121,6 +128,16 @@ class ArangoManagerImpl(
} yield {}
}

override def prune(retentionPeriod: Duration): Future[Unit] = {
log.debug(s"Prune data older than $retentionPeriod")
dataRetentionManager.pruneBefore(clock.millis - retentionPeriod.toMillis)
}

override def prune(dateTime: ZonedDateTime): Future[Unit] = {
log.debug(s"Prune data before $dateTime")
dataRetentionManager.pruneBefore(dateTime.toInstant.toEpochMilli)
}

private def deleteDbIfRequested(dropIfExists: Boolean) = {
for {
exists <- db.exists.toScala
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import za.co.absa.spline.persistence.ArangoImplicits.ArangoDatabaseAsyncScalaWra
import za.co.absa.spline.persistence.migration.{MigrationScriptRepository, Migrator}
import za.co.absa.spline.persistence.{ArangoConnectionURL, ArangoDatabaseFacade, DatabaseVersionManager}

import java.time.Clock
import javax.net.ssl.SSLContext
import scala.concurrent.ExecutionContext

Expand All @@ -36,21 +37,25 @@ class ArangoManagerFactoryImpl(activeFailover: Boolean)(implicit ec: ExecutionCo

def dbManager(db: ArangoDatabaseAsync): ArangoManager = {
val versionManager = new DatabaseVersionManager(db)
val drManager = new DataRetentionManager(db)
val migrator = new Migrator(db, scriptRepo, versionManager)
val foxxManager = new FoxxManagerImpl(db.restClient)
val clock = Clock.systemDefaultZone
new ArangoManagerImpl(
db,
versionManager,
drManager,
migrator,
foxxManager,
clock,
scriptRepo.latestToVersion
)
}

def dbFacade(): ArangoDatabaseFacade =
new ArangoDatabaseFacade(connectionURL, maybeSSLContext, activeFailover)

new AutoClosingArangoManagerProxy(dbManager, dbFacade)
AutoClosingArangoManagerProxy.create(dbManager, dbFacade)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,39 @@

package za.co.absa.spline.arango

import java.lang.reflect.{InvocationHandler, Method, Proxy}
import com.arangodb.async.ArangoDatabaseAsync
import za.co.absa.spline.persistence.ArangoDatabaseFacade

import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}

class AutoClosingArangoManagerProxy(
managerProvider: ArangoDatabaseAsync => ArangoManager,
arangoFacadeProvider: () => ArangoDatabaseFacade)
(implicit val ex: ExecutionContext)
extends ArangoManager {

override def initialize(onExistsAction: OnDBExistsAction, options: DatabaseCreateOptions): Future[Boolean] =
withManager(_.initialize(onExistsAction, options))

override def upgrade(): Future[Unit] =
withManager(_.upgrade())

override def execute(actions: AuxiliaryDBAction*): Future[Unit] =
withManager(_.execute(actions: _*))

private def withManager[A](fn: ArangoManager => Future[A]): Future[A] = {
val dbFacade = arangoFacadeProvider()

(Try(fn(managerProvider(dbFacade.db))) match {
case Failure(e) => Future.failed(e)
case Success(v) => v
}) andThen {
case _ => dbFacade.destroy()
object AutoClosingArangoManagerProxy {

def create(
managerProvider: ArangoDatabaseAsync => ArangoManager,
arangoFacadeProvider: () => ArangoDatabaseFacade)
(implicit ex: ExecutionContext): ArangoManager = {

val handler: InvocationHandler = (_: Any, method: Method, args: Array[AnyRef]) => {
val dbFacade = arangoFacadeProvider()
(Try {
val underlyingManager = managerProvider(dbFacade.db)
method
.invoke(underlyingManager, args: _*)
.asInstanceOf[Future[_]]
} match {
case Failure(e) => Future.failed(e)
case Success(v) => v
}) andThen {
case _ => dbFacade.destroy()
}
}
}

Proxy.newProxyInstance(
getClass.getClassLoader,
Array(classOf[ArangoManager]),
handler
).asInstanceOf[ArangoManager]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.spline.arango


import com.arangodb.async.ArangoDatabaseAsync
import za.co.absa.spline.persistence.ArangoImplicits._

import scala.concurrent.{ExecutionContext, Future}

class DataRetentionManager(db: ArangoDatabaseAsync)(implicit ec: ExecutionContext) {

def pruneBefore(timestamp: Long): Future[Unit] = {
db.restClient.delete(s"spline/admin/data/before/$timestamp")
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@

package za.co.absa.spline.admin

import java.time.{ZoneId, ZonedDateTime}

import org.mockito.ArgumentCaptor
import org.mockito.ArgumentMatchers._
import org.mockito.Mockito._
import org.mockito.Mockito.{inOrder => mockitoInOrder, _} // Mockito.inOrder would collide with Matchers.inOrder
import org.scalatest.OneInstancePerTest
import org.scalatest.OptionValues._
import org.scalatest.flatspec.AnyFlatSpec
Expand All @@ -35,6 +37,7 @@ import za.co.absa.spline.persistence.model.{EdgeDef, NodeDef}

import javax.net.ssl.SSLContext
import scala.concurrent.Future
import scala.concurrent.duration._

class AdminCLISpec
extends AnyFlatSpec
Expand Down Expand Up @@ -91,6 +94,14 @@ class AdminCLISpec
arangoManagerMock.execute(any()))
thenReturn Future.successful({}))

(when(
arangoManagerMock.prune(any[Duration]()))
thenReturn Future.successful({}))

(when(
arangoManagerMock.prune(any[ZonedDateTime]()))
thenReturn Future.successful({}))

it should "when called with wrong options, print welcome message" in {
captureStdErr {
captureExitStatus(cli.exec(Array("db-init"))) should be(1)
Expand Down Expand Up @@ -255,5 +266,30 @@ class AdminCLISpec
IndicesCreate,
)
}

behavior of "DB-Prune"

it should "require either -r or -d option" in {
val msg = captureStdErr(captureExitStatus(cli.exec(Array("db-prune", "arangodb://foo/bar"))) should be(1))
msg should include("Try --help for more information")
}

it should "support retention duration" in assertingStdOut(include("DONE")) {
cli.exec(Array("db-prune", "--retain-for", "30d", "arangodb://foo/bar"))
connUrlCaptor.getValue should be(ArangoConnectionURL("arangodb://foo/bar"))
verify(arangoManagerMock).prune(30.days)
verifyNoMoreInteractions(arangoManagerMock)
}

it should "prune support threshold timestamp" in assertingStdOut(include("DONE")) {
cli.exec(Array("db-prune", "--before-date", "2020-04-11", "arangodb://foo/bar"))
cli.exec(Array("db-prune", "--before-date", "2020-04-11T22:33Z", "arangodb://foo/bar"))

val inOrder = mockitoInOrder(arangoManagerMock)

inOrder.verify(arangoManagerMock).prune(ZonedDateTime.of(2020, 4, 11, 0, 0, 0, 0, ZoneId.systemDefault))
inOrder.verify(arangoManagerMock).prune(ZonedDateTime.of(2020, 4, 11, 22, 33, 0, 0, ZoneId.of("Z")))
inOrder.verifyNoMoreInteractions()
}
}
}
Loading