Skip to content

Commit

Permalink
#415 Add a pipeline notification target for ECS cleanup.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed May 31, 2024
1 parent 5d97ceb commit 4830d1a
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ import org.apache.http.impl.conn.BasicHttpClientConnectionManager
import org.apache.http.ssl.{SSLContexts, TrustStrategy}
import org.apache.http.util.EntityUtils
import org.slf4j.LoggerFactory
import za.co.absa.pramen.api.{DataFormat, NotificationTarget, TaskNotification}
import za.co.absa.pramen.api.{DataFormat, MetaTableDef, NotificationTarget, TaskNotification}
import za.co.absa.pramen.core.utils.Emoji
import za.co.absa.pramen.extras.notification.EcsNotificationTarget.{ECS_API_KEY_KEY, ECS_API_TRUST_SSL_KEY, ECS_API_URL_KEY}
import za.co.absa.pramen.extras.sink.HttpDeleteWithBody
import za.co.absa.pramen.extras.utils.ConfigUtils

import java.security.cert.X509Certificate
import java.time.LocalDate
import java.time.format.DateTimeFormatter

/**
Expand All @@ -52,18 +52,45 @@ class EcsNotificationTarget(conf: Config) extends NotificationTarget {
return
}

val (ecsApiUrl, ecsApiKey, trustAllSslCerts) = getEcsDetails
val (ecsApiUrl, ecsApiKey, trustAllSslCerts) = EcsNotificationTarget.getEcsDetails(conf)
val tableDef = notification.tableDef
val httpClient = getHttpClient(trustAllSslCerts)

log.info(s"ECS API URL: $ecsApiUrl")
try {
EcsNotificationTarget.cleanUpS3VersionsForTable(tableDef, notification.infoDate.get, ecsApiUrl, ecsApiKey, httpClient)
} finally {
httpClient.close()
}
}

protected def getHttpClient(trustAllSslCerts: Boolean): CloseableHttpClient = {
EcsNotificationTarget.getHttpClient(trustAllSslCerts)
}
}

object EcsNotificationTarget {
private val log = LoggerFactory.getLogger(this.getClass)

val ECS_API_URL_KEY = "ecs.api.url"
val ECS_API_KEY_KEY = "ecs.api.key"
val ECS_API_TRUST_SSL_KEY = "ecs.api.trust.all.ssl.certificates"

/**
* Cleans up a Pramen metatable via a special REST API call.
*/
def cleanUpS3VersionsForTable(tableDef: MetaTableDef,
infoDate: LocalDate,
apiUrl: String,
apiKey: String,
httpClient: HttpClient): Unit = {
log.info(s"ECS API URL: $apiUrl")
log.info(s"ECS API Key: [redacted]")
log.info(s"Trust all SSL certificates: $trustAllSslCerts")
log.info(s"Metatable: ${tableDef.name} (${tableDef.format.name})")
log.info(s"Info date column: ${tableDef.infoDateColumn}")
log.info(s"Info date format: ${tableDef.infoDateFormat}")

val formatter = DateTimeFormatter.ofPattern(tableDef.infoDateFormat)
val infoDateStr = formatter.format(notification.infoDate.get)
val infoDateStr = formatter.format(infoDate)
log.info(s"Info date: $infoDateStr")

tableDef.format match {
Expand All @@ -77,38 +104,14 @@ class EcsNotificationTarget(conf: Config) extends NotificationTarget {
val partitionPath = new Path(basePath, s"${tableDef.infoDateColumn}=$infoDateStr")
log.info(s"Partition path: $partitionPath")

val httpClient = getHttpClient(trustAllSslCerts)
EcsNotificationTarget.cleanUpS3VersionsForPath(partitionPath, ecsApiUrl, ecsApiKey, httpClient)
EcsNotificationTarget.cleanUpS3VersionsForPath(partitionPath, apiUrl, apiKey, httpClient)
case format =>
log.warn(s"Format ${format.name} is not supported. Skipping cleanup.")
}
}

protected def getHttpClient(trustAllSslCerts: Boolean): CloseableHttpClient = {
EcsNotificationTarget.getHttpClient(trustAllSslCerts)
}

private[extras] def getEcsDetails: (String, String, Boolean) = {
require(conf.hasPath(ECS_API_URL_KEY), s"The key is not defined: '$ECS_API_URL_KEY'")
require(conf.hasPath(ECS_API_KEY_KEY), s"The key is not defined: '$ECS_API_KEY_KEY'")

val ecsApiUrl = conf.getString(ECS_API_URL_KEY)
val ecsApiKey = conf.getString(ECS_API_KEY_KEY)
val trustAllSslCerts = ConfigUtils.getOptionBoolean(conf, ECS_API_TRUST_SSL_KEY).getOrElse(false)

(ecsApiUrl, ecsApiKey, trustAllSslCerts)
}
}

object EcsNotificationTarget {
private val log = LoggerFactory.getLogger(this.getClass)

val ECS_API_URL_KEY = "ecs.api.url"
val ECS_API_KEY_KEY = "ecs.api.key"
val ECS_API_TRUST_SSL_KEY = "ecs.api.trust.all.ssl.certificates"

/**
* Cleans up ECS buckets via a special REST API call.
* Cleans up an ECS path via a special REST API call.
*/
def cleanUpS3VersionsForPath(partitionPath: Path,
apiUrl: String,
Expand Down Expand Up @@ -149,6 +152,8 @@ object EcsNotificationTarget {
// but for some strange reason the EnceladusSink class can't be found/loaded
// when this library is used.

log.info(s"Trust all SSL certificates: $trustAllSslCerts")

if (trustAllSslCerts) {
log.warn("Trusting all SSL certificates for the cleanup API.")
val trustStrategy = new TrustStrategy {
Expand All @@ -175,6 +180,17 @@ object EcsNotificationTarget {
}
}

private[extras] def getEcsDetails(conf: Config): (String, String, Boolean) = {
require(conf.hasPath(ECS_API_URL_KEY), s"The key is not defined: '$ECS_API_URL_KEY'")
require(conf.hasPath(ECS_API_KEY_KEY), s"The key is not defined: '$ECS_API_KEY_KEY'")

val ecsApiUrl = conf.getString(ECS_API_URL_KEY)
val ecsApiKey = conf.getString(ECS_API_KEY_KEY)
val trustAllSslCerts = ConfigUtils.getOptionBoolean(conf, ECS_API_TRUST_SSL_KEY).getOrElse(false)

(ecsApiUrl, ecsApiKey, trustAllSslCerts)
}

private[extras] def getCleanUpS3VersionsRequestBody(partitionPath: Path): String = {
val partitionPathWithoutAuthority = removeAuthority(partitionPath)
s"""{"ecs_path":"$partitionPathWithoutAuthority"}"""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.extras.notification

import com.typesafe.config.Config
import org.apache.http.impl.client.CloseableHttpClient
import org.slf4j.LoggerFactory
import za.co.absa.pramen.api.{PipelineNotificationTarget, TaskNotification, TaskStatus}
import za.co.absa.pramen.extras.utils.ConfigUtils

import java.time.Instant

/**
* Runs the ECS cleanup API against the target partition after the job jas completed.
*
* Example usage:
* {{{
* pramen.ecs.api {
* url = "https://dummy.local"
* key = "aabbcc"
* trust.all.ssl.certificates = false
* }
*
* pramen.pipeline.notification.targets = [ "za.co.absa.pramen.extras.notification.EcsPipelineNotificationTarget" ]
* }}}
*/
class EcsPipelineNotificationTarget(conf: Config) extends PipelineNotificationTarget {
private val log = LoggerFactory.getLogger(this.getClass)

override def config: Config = conf

/** Sends a notification after completion of the pipeline. */
override def sendNotification(pipelineStarted: Instant,
applicationId: Option[String],
appException: Option[Throwable],
tasksCompleted: Seq[TaskNotification]): Unit = {
val (ecsApiUrl, ecsApiKey, trustAllSslCerts) = EcsPipelineNotificationTarget.getEcsDetails(conf)

val httpClient = getHttpClient(trustAllSslCerts)

try {
tasksCompleted.foreach { task =>
(task.infoDate, task.status) match {
case (Some(infoDate), _: TaskStatus.Succeeded) =>
EcsNotificationTarget.cleanUpS3VersionsForTable(task.tableDef, infoDate, ecsApiUrl, ecsApiKey, httpClient)
case (Some(infoDate), _) =>
log.info(s"The task outputting to '${task.tableName}' for '$infoDate' status is not a success - skipping ECS cleanup...")
case (None, status) =>
log.info(s"The task outputting to '${task.tableName}' status is not a success - skipping ECS cleanup...")
}
}
} finally {
httpClient.close()
}
}

protected def getHttpClient(trustAllSslCerts: Boolean): CloseableHttpClient = {
EcsNotificationTarget.getHttpClient(trustAllSslCerts)
}
}

object EcsPipelineNotificationTarget {
val ECS_API_URL_KEY = "pramen.ecs.api.url"
val ECS_API_KEY_KEY = "pramen.ecs.api.key"
val ECS_API_TRUST_SSL_KEY = "pramen.ecs.api.trust.all.ssl.certificates"

private[extras] def getEcsDetails(conf: Config): (String, String, Boolean) = {
require(conf.hasPath(ECS_API_URL_KEY), s"The key is not defined: '$ECS_API_URL_KEY'")
require(conf.hasPath(ECS_API_KEY_KEY), s"The key is not defined: '$ECS_API_KEY_KEY'")

val ecsApiUrl = conf.getString(ECS_API_URL_KEY)
val ecsApiKey = conf.getString(ECS_API_KEY_KEY)
val trustAllSslCerts = ConfigUtils.getOptionBoolean(conf, ECS_API_TRUST_SSL_KEY).getOrElse(false)

(ecsApiUrl, ecsApiKey, trustAllSslCerts)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class EcsNotificationTargetSuite extends AnyWordSpec {
|""".stripMargin
)

val (url, key, trust) = new EcsNotificationTarget(conf).getEcsDetails
val (url, key, trust) = EcsNotificationTarget.getEcsDetails(conf)

assert(url == "https://dummyurl.local")
assert(key == "abcd")
Expand Down

0 comments on commit 4830d1a

Please sign in to comment.