From c216903c414d5e6b79a894e167ffec5b3e1e8214 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Tue, 28 May 2024 11:18:56 +0200 Subject: [PATCH] #415 Implement ECS cleanup notification target. --- .../notification/EcsNotificationTarget.scala | 108 +++++++++++++-- .../pramen/extras/sink/EnceladusSink.scala | 8 +- .../EcsNotificationTargetSuite.scala | 131 ++++++++++++++++++ pramen/project/Dependencies.scala | 3 +- 4 files changed, 234 insertions(+), 16 deletions(-) create mode 100644 pramen/extras/src/test/scala/za/co/absa/pramen/extras/notification/EcsNotificationTargetSuite.scala diff --git a/pramen/extras/src/main/scala/za/co/absa/pramen/extras/notification/EcsNotificationTarget.scala b/pramen/extras/src/main/scala/za/co/absa/pramen/extras/notification/EcsNotificationTarget.scala index 49e1c36c..46205e41 100644 --- a/pramen/extras/src/main/scala/za/co/absa/pramen/extras/notification/EcsNotificationTarget.scala +++ b/pramen/extras/src/main/scala/za/co/absa/pramen/extras/notification/EcsNotificationTarget.scala @@ -17,6 +17,7 @@ package za.co.absa.pramen.extras.notification import com.typesafe.config.Config +import org.apache.hadoop.fs.Path import org.apache.http.HttpStatus import org.apache.http.client.HttpClient import org.apache.http.config.RegistryBuilder @@ -28,37 +29,95 @@ import org.apache.http.impl.conn.BasicHttpClientConnectionManager import org.apache.http.ssl.{SSLContexts, TrustStrategy} import org.apache.http.util.EntityUtils import org.slf4j.LoggerFactory -import za.co.absa.pramen.api.{NotificationTarget, TaskNotification} +import za.co.absa.pramen.api.{DataFormat, NotificationTarget, TaskNotification} import za.co.absa.pramen.core.utils.Emoji +import za.co.absa.pramen.extras.notification.EcsNotificationTarget.{ECS_API_KEY_KEY, ECS_API_TRUST_SSL_KEY, ECS_API_URL_KEY} import za.co.absa.pramen.extras.sink.HttpDeleteWithBody +import za.co.absa.pramen.extras.utils.ConfigUtils import java.security.cert.X509Certificate +import java.time.format.DateTimeFormatter /** * Runs the ECS cleanup API against the target partition after the job jas completed. */ class EcsNotificationTarget(conf: Config) extends NotificationTarget { + private val log = LoggerFactory.getLogger(this.getClass) + override def config: Config = conf override def sendNotification(notification: TaskNotification): Unit = { - // ToDo implementation + if (notification.infoDate.isEmpty) { + log.warn(s"Information date not provided - skipping ECS cleanup.") + return + } + + val (ecsApiUrl, ecsApiKey, trustAllSslCerts) = getEcsDetails + val tableDef = notification.tableDef + + log.info(s"ECS API URL: $ecsApiUrl") + log.info(s"ECS API Key: [redacted]") + log.info(s"Trust all SSL certificates: $trustAllSslCerts") + log.info(s"Metatable: ${tableDef.name} (${tableDef.format.name})") + log.info(s"Info date column: ${tableDef.infoDateColumn}") + log.info(s"Info date format: ${tableDef.infoDateFormat}") + + val formatter = DateTimeFormatter.ofPattern(tableDef.infoDateFormat) + val infoDateStr = formatter.format(notification.infoDate.get) + log.info(s"Info date: $infoDateStr") + + tableDef.format match { + case DataFormat.Parquet(basePath, _) => + log.info(s"Base path: $basePath") + if (!basePath.toLowerCase.startsWith("s3://") && !basePath.toLowerCase.startsWith("s3a://")) { + log.warn(s"The base bath ($basePath) is not on S3. S3 versions cleanup won't be done.") + return + } + + val partitionPath = new Path(basePath, s"${tableDef.infoDateColumn}=$infoDateStr") + log.info(s"Partition path: $partitionPath") + + val httpClient = getHttpClient(trustAllSslCerts) + EcsNotificationTarget.cleanUpS3VersionsForPath(partitionPath, ecsApiUrl, ecsApiKey, httpClient) + case format => + log.warn(s"Format ${format.name} is not supported. Skipping cleanup.") + } + } + + protected def getHttpClient(trustAllSslCerts: Boolean): CloseableHttpClient = { + EcsNotificationTarget.getHttpClient(trustAllSslCerts) + } + + private[extras] def getEcsDetails: (String, String, Boolean) = { + require(conf.hasPath(ECS_API_URL_KEY), s"The key is not defined: '$ECS_API_URL_KEY'") + require(conf.hasPath(ECS_API_KEY_KEY), s"The key is not defined: '$ECS_API_KEY_KEY'") + + val ecsApiUrl = conf.getString(ECS_API_URL_KEY) + val ecsApiKey = conf.getString(ECS_API_KEY_KEY) + val trustAllSslCerts = ConfigUtils.getOptionBoolean(conf, ECS_API_TRUST_SSL_KEY).getOrElse(false) + + (ecsApiUrl, ecsApiKey, trustAllSslCerts) } } object EcsNotificationTarget { private val log = LoggerFactory.getLogger(this.getClass) - def cleanUpS3VersionsForPath(partitionPath: String, + val ECS_API_URL_KEY = "ecs.api.url" + val ECS_API_KEY_KEY = "ecs.api.key" + val ECS_API_TRUST_SSL_KEY = "ecs.api.trust.all.ssl.certificates" + + /** + * Cleans up ECS buckets via a special REST API call. + */ + def cleanUpS3VersionsForPath(partitionPath: Path, apiUrl: String, apiKey: String, httpClient: HttpClient): Unit = { - val body = s"""{"ecs_path":"$partitionPath"}""" + val body = getCleanUpS3VersionsRequestBody(partitionPath) log.info(s"Sending: $body") - val httpDelete = new HttpDeleteWithBody(apiUrl) - - httpDelete.addHeader("x-api-key", apiKey) - httpDelete.setEntity(new StringEntity(body)) + val httpDelete = getCleanUpS3VersionsRequest(body, apiUrl, apiKey) try { val response = httpClient.execute(httpDelete) @@ -76,7 +135,15 @@ object EcsNotificationTarget { } } - private[extras] def getHttpClient(trustAllSslCerts: Boolean): CloseableHttpClient = { + /** + * Returns an instance of Apache HTTP client. + * + * Do not forget to close the client after use. + * + * @param trustAllSslCerts if true, the client will trust any SSL certificate. + * @return an Http Client + */ + def getHttpClient(trustAllSslCerts: Boolean): CloseableHttpClient = { // Using Apache HTTP Client. // Tried using com.lihaoyi:requests:0.8.0, // but for some strange reason the EnceladusSink class can't be found/loaded @@ -107,4 +174,27 @@ object EcsNotificationTarget { HttpClients.createDefault() } } + + private[extras] def getCleanUpS3VersionsRequestBody(partitionPath: Path): String = { + val partitionPathWithoutAuthority = removeAuthority(partitionPath) + s"""{"ecs_path":"$partitionPathWithoutAuthority"}""" + } + + private[extras] def getCleanUpS3VersionsRequest(requestBody: String, apiUrl: String, apiKey: String): HttpDeleteWithBody = { + val httpDelete = new HttpDeleteWithBody(apiUrl) + + httpDelete.addHeader("x-api-key", apiKey) + httpDelete.setEntity(new StringEntity(requestBody)) + + httpDelete + } + + private[extras] def removeAuthority(path: Path): String = { + val uri = path.toUri + if (uri.getHost != null) { + s"${uri.getHost}${uri.getPath}" + } else { + s"${uri.getPath}" + } + } } diff --git a/pramen/extras/src/main/scala/za/co/absa/pramen/extras/sink/EnceladusSink.scala b/pramen/extras/src/main/scala/za/co/absa/pramen/extras/sink/EnceladusSink.scala index c8fd9009..4602f089 100644 --- a/pramen/extras/src/main/scala/za/co/absa/pramen/extras/sink/EnceladusSink.scala +++ b/pramen/extras/src/main/scala/za/co/absa/pramen/extras/sink/EnceladusSink.scala @@ -413,17 +413,13 @@ class EnceladusSink(sinkConfig: Config, val httpClient = EcsNotificationTarget.getHttpClient(trustAllSslCerts) try { - EcsNotificationTarget.cleanUpS3VersionsForPath(removeAuthority(rawPartitionPath), apiUrl, apiKey, httpClient) - EcsNotificationTarget.cleanUpS3VersionsForPath(removeAuthority(publishPartitionPath), apiUrl, apiKey, httpClient) + EcsNotificationTarget.cleanUpS3VersionsForPath(rawPartitionPath, apiUrl, apiKey, httpClient) + EcsNotificationTarget.cleanUpS3VersionsForPath(publishPartitionPath, apiUrl, apiKey, httpClient) } finally { httpClient.close() } } - private[extras] def removeAuthority(path: Path): String = { - s"${path.toUri.getHost}${path.toUri.getPath}" - } - private[extras] def updateTable(hiveTable: String, publishBase: String, infoDate: LocalDate, infoVersion: Int)(implicit spark: SparkSession): Unit = { if (hiveHelper.doesTableExist(enceladusConfig.hiveDatabase, hiveTable)) { if (enceladusConfig.preferAddPartition) { diff --git a/pramen/extras/src/test/scala/za/co/absa/pramen/extras/notification/EcsNotificationTargetSuite.scala b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/notification/EcsNotificationTargetSuite.scala new file mode 100644 index 00000000..ee53c4b0 --- /dev/null +++ b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/notification/EcsNotificationTargetSuite.scala @@ -0,0 +1,131 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.extras.notification + +import com.typesafe.config.ConfigFactory +import org.apache.hadoop.fs.Path +import org.apache.http.client.HttpClient +import org.apache.http.impl.client.CloseableHttpClient +import org.apache.http.{HttpEntity, HttpResponse, HttpStatus, StatusLine} +import org.mockito.ArgumentMatchers.any +import org.mockito.Mockito.{mock, when => whenMock} +import org.scalatest.wordspec.AnyWordSpec +import za.co.absa.pramen.extras.notification.EcsNotificationTarget.{ECS_API_KEY_KEY, ECS_API_TRUST_SSL_KEY, ECS_API_URL_KEY} +import za.co.absa.pramen.extras.sink.HttpDeleteWithBody + +class EcsNotificationTargetSuite extends AnyWordSpec { + "sendNotification" should { + "send the expected request according to config" in { + + } + } + + "getEcsDetails" should { + "get parameters from config" in { + val conf = ConfigFactory.parseString( + s""" + |$ECS_API_URL_KEY = "https://dummyurl.local" + |$ECS_API_KEY_KEY = "abcd" + |$ECS_API_TRUST_SSL_KEY = true + |""".stripMargin + ) + + val (url, key, trust) = new EcsNotificationTarget(conf).getEcsDetails + + assert(url == "https://dummyurl.local") + assert(key == "abcd") + assert(trust) + } + } + + "cleanUpS3VersionsForPath" should { + "send the expected request according to config" in { + val httpClient = mock(classOf[HttpClient]) + val httpResponse = mock(classOf[HttpResponse]) + val statusLine = mock(classOf[StatusLine]) + val httpEntity = mock(classOf[HttpEntity]) + + whenMock(statusLine.getStatusCode).thenReturn(HttpStatus.SC_OK) + whenMock(httpResponse.getStatusLine).thenReturn(statusLine) + whenMock(httpResponse.getEntity).thenReturn(httpEntity) + whenMock(httpClient.execute(any[HttpDeleteWithBody])).thenReturn(httpResponse) + + EcsNotificationTarget.cleanUpS3VersionsForPath(new Path("bucket/path/date=2024-02-18"), "https://dummyurl.local", "abcd", httpClient) + } + } + + "getHttpClient" should { + "return a standard HTTP client when not trusting all SSL certificates blindly" in { + val httpClient = EcsNotificationTarget.getHttpClient(false) + + assert(httpClient.isInstanceOf[CloseableHttpClient]) + } + + "return a custom HTTP client when trusting all SSL certificates" in { + val httpClient = EcsNotificationTarget.getHttpClient(true) + + assert(httpClient.isInstanceOf[CloseableHttpClient]) + } + } + + "getCleanUpS3VersionsRequestBody" should { + "format the API request body properly" in { + val body = EcsNotificationTarget.getCleanUpS3VersionsRequestBody(new Path("s3a://bucket/path/date=2024-02-18")) + + assert(body == "{\"ecs_path\":\"bucket/path/date=2024-02-18\"}") + } + } + + "getCleanUpS3VersionsRequest" should { + "return the proper request" in { + val request = EcsNotificationTarget.getCleanUpS3VersionsRequest("{\"ecs_path\":\"bucket/path/date=2024-02-18\"}", "https://dummyurl.local", "abcd") + + assert(request.isInstanceOf[HttpDeleteWithBody]) + assert(request.getHeaders("x-api-key").head.getValue == "abcd") + } + } + + "removeAuthority" should { + "not change if the path does not contain any" in { + val path = new Path("/path/date=2024-02-18") + val actual = EcsNotificationTarget.removeAuthority(path) + + assert(actual == "/path/date=2024-02-18") + } + + "remove the s3 authority" in { + val path = new Path("s3://bucket/path/date=2024-02-18") + val actual = EcsNotificationTarget.removeAuthority(path) + + assert(actual == "bucket/path/date=2024-02-18") + } + + "remove the s3a authority" in { + val path = new Path("s3a://bucket/path/date=2024-02-18") + val actual = EcsNotificationTarget.removeAuthority(path) + + assert(actual == "bucket/path/date=2024-02-18") + } + + "remove the hdfs authority" in { + val path = new Path("hdfs://cluster/path/date=2024-02-18") + val actual = EcsNotificationTarget.removeAuthority(path) + + assert(actual == "cluster/path/date=2024-02-18") + } + } +} diff --git a/pramen/project/Dependencies.scala b/pramen/project/Dependencies.scala index 26b12956..83003511 100644 --- a/pramen/project/Dependencies.scala +++ b/pramen/project/Dependencies.scala @@ -48,7 +48,8 @@ object Dependencies { "org.apache.spark" %% "spark-sql" % sparkVersion(scalaVersion) % Provided, "net.sourceforge.jtds" % "jtds" % msSqlDriverVersion, "org.apache.httpcomponents" % "httpclient" % httpClientVersion, - "org.scalatest" %% "scalatest" % scalatestVersion % Test + "org.scalatest" %% "scalatest" % scalatestVersion % Test, + "org.mockito" % "mockito-core" % mockitoVersion % Test, ) ++ Seq( getAbrisDependency(sparkVersion(scalaVersion)), getDeltaDependency(sparkVersion(scalaVersion), isCompile = false, isTest = true)