Skip to content

Commit

Permalink
#415 Implement ECS cleanup notification target.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed May 28, 2024
1 parent a7bb8f9 commit c216903
Show file tree
Hide file tree
Showing 4 changed files with 234 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package za.co.absa.pramen.extras.notification

import com.typesafe.config.Config
import org.apache.hadoop.fs.Path
import org.apache.http.HttpStatus
import org.apache.http.client.HttpClient
import org.apache.http.config.RegistryBuilder
Expand All @@ -28,37 +29,95 @@ import org.apache.http.impl.conn.BasicHttpClientConnectionManager
import org.apache.http.ssl.{SSLContexts, TrustStrategy}
import org.apache.http.util.EntityUtils
import org.slf4j.LoggerFactory
import za.co.absa.pramen.api.{NotificationTarget, TaskNotification}
import za.co.absa.pramen.api.{DataFormat, NotificationTarget, TaskNotification}
import za.co.absa.pramen.core.utils.Emoji
import za.co.absa.pramen.extras.notification.EcsNotificationTarget.{ECS_API_KEY_KEY, ECS_API_TRUST_SSL_KEY, ECS_API_URL_KEY}
import za.co.absa.pramen.extras.sink.HttpDeleteWithBody
import za.co.absa.pramen.extras.utils.ConfigUtils

import java.security.cert.X509Certificate
import java.time.format.DateTimeFormatter

/**
* Runs the ECS cleanup API against the target partition after the job jas completed.
*/
class EcsNotificationTarget(conf: Config) extends NotificationTarget {
private val log = LoggerFactory.getLogger(this.getClass)

override def config: Config = conf

override def sendNotification(notification: TaskNotification): Unit = {
// ToDo implementation
if (notification.infoDate.isEmpty) {
log.warn(s"Information date not provided - skipping ECS cleanup.")
return
}

val (ecsApiUrl, ecsApiKey, trustAllSslCerts) = getEcsDetails
val tableDef = notification.tableDef

log.info(s"ECS API URL: $ecsApiUrl")
log.info(s"ECS API Key: [redacted]")
log.info(s"Trust all SSL certificates: $trustAllSslCerts")
log.info(s"Metatable: ${tableDef.name} (${tableDef.format.name})")
log.info(s"Info date column: ${tableDef.infoDateColumn}")
log.info(s"Info date format: ${tableDef.infoDateFormat}")

val formatter = DateTimeFormatter.ofPattern(tableDef.infoDateFormat)
val infoDateStr = formatter.format(notification.infoDate.get)
log.info(s"Info date: $infoDateStr")

tableDef.format match {
case DataFormat.Parquet(basePath, _) =>
log.info(s"Base path: $basePath")
if (!basePath.toLowerCase.startsWith("s3://") && !basePath.toLowerCase.startsWith("s3a://")) {
log.warn(s"The base bath ($basePath) is not on S3. S3 versions cleanup won't be done.")
return
}

val partitionPath = new Path(basePath, s"${tableDef.infoDateColumn}=$infoDateStr")
log.info(s"Partition path: $partitionPath")

val httpClient = getHttpClient(trustAllSslCerts)
EcsNotificationTarget.cleanUpS3VersionsForPath(partitionPath, ecsApiUrl, ecsApiKey, httpClient)
case format =>
log.warn(s"Format ${format.name} is not supported. Skipping cleanup.")
}
}

protected def getHttpClient(trustAllSslCerts: Boolean): CloseableHttpClient = {
EcsNotificationTarget.getHttpClient(trustAllSslCerts)
}

private[extras] def getEcsDetails: (String, String, Boolean) = {
require(conf.hasPath(ECS_API_URL_KEY), s"The key is not defined: '$ECS_API_URL_KEY'")
require(conf.hasPath(ECS_API_KEY_KEY), s"The key is not defined: '$ECS_API_KEY_KEY'")

val ecsApiUrl = conf.getString(ECS_API_URL_KEY)
val ecsApiKey = conf.getString(ECS_API_KEY_KEY)
val trustAllSslCerts = ConfigUtils.getOptionBoolean(conf, ECS_API_TRUST_SSL_KEY).getOrElse(false)

(ecsApiUrl, ecsApiKey, trustAllSslCerts)
}
}

object EcsNotificationTarget {
private val log = LoggerFactory.getLogger(this.getClass)

def cleanUpS3VersionsForPath(partitionPath: String,
val ECS_API_URL_KEY = "ecs.api.url"
val ECS_API_KEY_KEY = "ecs.api.key"
val ECS_API_TRUST_SSL_KEY = "ecs.api.trust.all.ssl.certificates"

/**
* Cleans up ECS buckets via a special REST API call.
*/
def cleanUpS3VersionsForPath(partitionPath: Path,
apiUrl: String,
apiKey: String,
httpClient: HttpClient): Unit = {
val body = s"""{"ecs_path":"$partitionPath"}"""
val body = getCleanUpS3VersionsRequestBody(partitionPath)
log.info(s"Sending: $body")

val httpDelete = new HttpDeleteWithBody(apiUrl)

httpDelete.addHeader("x-api-key", apiKey)
httpDelete.setEntity(new StringEntity(body))
val httpDelete = getCleanUpS3VersionsRequest(body, apiUrl, apiKey)

try {
val response = httpClient.execute(httpDelete)
Expand All @@ -76,7 +135,15 @@ object EcsNotificationTarget {
}
}

private[extras] def getHttpClient(trustAllSslCerts: Boolean): CloseableHttpClient = {
/**
* Returns an instance of Apache HTTP client.
*
* Do not forget to close the client after use.
*
* @param trustAllSslCerts if true, the client will trust any SSL certificate.
* @return an Http Client
*/
def getHttpClient(trustAllSslCerts: Boolean): CloseableHttpClient = {
// Using Apache HTTP Client.
// Tried using com.lihaoyi:requests:0.8.0,
// but for some strange reason the EnceladusSink class can't be found/loaded
Expand Down Expand Up @@ -107,4 +174,27 @@ object EcsNotificationTarget {
HttpClients.createDefault()
}
}

private[extras] def getCleanUpS3VersionsRequestBody(partitionPath: Path): String = {
val partitionPathWithoutAuthority = removeAuthority(partitionPath)
s"""{"ecs_path":"$partitionPathWithoutAuthority"}"""
}

private[extras] def getCleanUpS3VersionsRequest(requestBody: String, apiUrl: String, apiKey: String): HttpDeleteWithBody = {
val httpDelete = new HttpDeleteWithBody(apiUrl)

httpDelete.addHeader("x-api-key", apiKey)
httpDelete.setEntity(new StringEntity(requestBody))

httpDelete
}

private[extras] def removeAuthority(path: Path): String = {
val uri = path.toUri
if (uri.getHost != null) {
s"${uri.getHost}${uri.getPath}"
} else {
s"${uri.getPath}"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -413,17 +413,13 @@ class EnceladusSink(sinkConfig: Config,
val httpClient = EcsNotificationTarget.getHttpClient(trustAllSslCerts)

try {
EcsNotificationTarget.cleanUpS3VersionsForPath(removeAuthority(rawPartitionPath), apiUrl, apiKey, httpClient)
EcsNotificationTarget.cleanUpS3VersionsForPath(removeAuthority(publishPartitionPath), apiUrl, apiKey, httpClient)
EcsNotificationTarget.cleanUpS3VersionsForPath(rawPartitionPath, apiUrl, apiKey, httpClient)
EcsNotificationTarget.cleanUpS3VersionsForPath(publishPartitionPath, apiUrl, apiKey, httpClient)
} finally {
httpClient.close()
}
}

private[extras] def removeAuthority(path: Path): String = {
s"${path.toUri.getHost}${path.toUri.getPath}"
}

private[extras] def updateTable(hiveTable: String, publishBase: String, infoDate: LocalDate, infoVersion: Int)(implicit spark: SparkSession): Unit = {
if (hiveHelper.doesTableExist(enceladusConfig.hiveDatabase, hiveTable)) {
if (enceladusConfig.preferAddPartition) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.extras.notification

import com.typesafe.config.ConfigFactory
import org.apache.hadoop.fs.Path
import org.apache.http.client.HttpClient
import org.apache.http.impl.client.CloseableHttpClient
import org.apache.http.{HttpEntity, HttpResponse, HttpStatus, StatusLine}
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{mock, when => whenMock}
import org.scalatest.wordspec.AnyWordSpec
import za.co.absa.pramen.extras.notification.EcsNotificationTarget.{ECS_API_KEY_KEY, ECS_API_TRUST_SSL_KEY, ECS_API_URL_KEY}
import za.co.absa.pramen.extras.sink.HttpDeleteWithBody

class EcsNotificationTargetSuite extends AnyWordSpec {
"sendNotification" should {
"send the expected request according to config" in {

}
}

"getEcsDetails" should {
"get parameters from config" in {
val conf = ConfigFactory.parseString(
s"""
|$ECS_API_URL_KEY = "https://dummyurl.local"
|$ECS_API_KEY_KEY = "abcd"
|$ECS_API_TRUST_SSL_KEY = true
|""".stripMargin
)

val (url, key, trust) = new EcsNotificationTarget(conf).getEcsDetails

assert(url == "https://dummyurl.local")
assert(key == "abcd")
assert(trust)
}
}

"cleanUpS3VersionsForPath" should {
"send the expected request according to config" in {
val httpClient = mock(classOf[HttpClient])
val httpResponse = mock(classOf[HttpResponse])
val statusLine = mock(classOf[StatusLine])
val httpEntity = mock(classOf[HttpEntity])

whenMock(statusLine.getStatusCode).thenReturn(HttpStatus.SC_OK)
whenMock(httpResponse.getStatusLine).thenReturn(statusLine)
whenMock(httpResponse.getEntity).thenReturn(httpEntity)
whenMock(httpClient.execute(any[HttpDeleteWithBody])).thenReturn(httpResponse)

EcsNotificationTarget.cleanUpS3VersionsForPath(new Path("bucket/path/date=2024-02-18"), "https://dummyurl.local", "abcd", httpClient)
}
}

"getHttpClient" should {
"return a standard HTTP client when not trusting all SSL certificates blindly" in {
val httpClient = EcsNotificationTarget.getHttpClient(false)

assert(httpClient.isInstanceOf[CloseableHttpClient])
}

"return a custom HTTP client when trusting all SSL certificates" in {
val httpClient = EcsNotificationTarget.getHttpClient(true)

assert(httpClient.isInstanceOf[CloseableHttpClient])
}
}

"getCleanUpS3VersionsRequestBody" should {
"format the API request body properly" in {
val body = EcsNotificationTarget.getCleanUpS3VersionsRequestBody(new Path("s3a://bucket/path/date=2024-02-18"))

assert(body == "{\"ecs_path\":\"bucket/path/date=2024-02-18\"}")
}
}

"getCleanUpS3VersionsRequest" should {
"return the proper request" in {
val request = EcsNotificationTarget.getCleanUpS3VersionsRequest("{\"ecs_path\":\"bucket/path/date=2024-02-18\"}", "https://dummyurl.local", "abcd")

assert(request.isInstanceOf[HttpDeleteWithBody])
assert(request.getHeaders("x-api-key").head.getValue == "abcd")
}
}

"removeAuthority" should {
"not change if the path does not contain any" in {
val path = new Path("/path/date=2024-02-18")
val actual = EcsNotificationTarget.removeAuthority(path)

assert(actual == "/path/date=2024-02-18")
}

"remove the s3 authority" in {
val path = new Path("s3://bucket/path/date=2024-02-18")
val actual = EcsNotificationTarget.removeAuthority(path)

assert(actual == "bucket/path/date=2024-02-18")
}

"remove the s3a authority" in {
val path = new Path("s3a://bucket/path/date=2024-02-18")
val actual = EcsNotificationTarget.removeAuthority(path)

assert(actual == "bucket/path/date=2024-02-18")
}

"remove the hdfs authority" in {
val path = new Path("hdfs://cluster/path/date=2024-02-18")
val actual = EcsNotificationTarget.removeAuthority(path)

assert(actual == "cluster/path/date=2024-02-18")
}
}
}
3 changes: 2 additions & 1 deletion pramen/project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ object Dependencies {
"org.apache.spark" %% "spark-sql" % sparkVersion(scalaVersion) % Provided,
"net.sourceforge.jtds" % "jtds" % msSqlDriverVersion,
"org.apache.httpcomponents" % "httpclient" % httpClientVersion,
"org.scalatest" %% "scalatest" % scalatestVersion % Test
"org.scalatest" %% "scalatest" % scalatestVersion % Test,
"org.mockito" % "mockito-core" % mockitoVersion % Test,
) ++ Seq(
getAbrisDependency(sparkVersion(scalaVersion)),
getDeltaDependency(sparkVersion(scalaVersion), isCompile = false, isTest = true)
Expand Down

0 comments on commit c216903

Please sign in to comment.