Skip to content

Commit

Permalink
#415 Add simple http client implementation for use with custom notifi…
Browse files Browse the repository at this point in the history
…cation targets.
  • Loading branch information
yruslan committed May 31, 2024
1 parent 4830d1a commit fe34063
Show file tree
Hide file tree
Showing 23 changed files with 902 additions and 11 deletions.
1 change: 1 addition & 0 deletions pramen/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ lazy val assemblySettingsCommon = Seq(
)

lazy val assemblySettingsExtras = assemblySettingsCommon ++ Seq(assembly / assemblyShadeRules:= Seq(
ShadeRule.rename(shade("org.apache.http")).inAll,
ShadeRule.zap("com.101tec.**").inAll,
ShadeRule.zap("buildinfo.**").inAll,
ShadeRule.zap("com.databricks.**").inAll,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.extras.utils.httpclient

sealed trait HttpMethod {
def name: String
}

object HttpMethod {
case object GET extends HttpMethod {
def name = "GET"
}

case object POST extends HttpMethod {
def name = "POST"
}

case object PUT extends HttpMethod {
def name = "PUT"
}

case object DELETE extends HttpMethod {
def name = "DELETE"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.extras.utils.httpclient

import org.slf4j.LoggerFactory
import za.co.absa.pramen.extras.utils.httpclient.impl.{BasicHttpClient, RetryableHttpClient}

trait SimpleHttpClient extends AutoCloseable {
def execute(request: SimpleHttpRequest): SimpleHttpResponse
}

object SimpleHttpClient {
private val log = LoggerFactory.getLogger(this.getClass)

/**
* This builds an instance of `SimpleHttpClient` that should be used in practice.
*
* You can customize the retry policy.
*
* @param trustAllSslCerts If true, the client will trust all SSL certificates.
* @param numberOfRetries The number of retries to be done in case of failure.
* @param backoffMs The maximum backoff time in milliseconds between retries.
* @return An instance of `SimpleHttpClient`.
*/
def apply(trustAllSslCerts: Boolean = false,
numberOfRetries: Int = RetryableHttpClient.DEFAULT_NUMBER_OF_RETRIES,
backoffMs: Int = RetryableHttpClient.DEFAULT_MAXIMUM_BACKOFF_MS): SimpleHttpClient = {
log.info("Creating a default HTTP client with the following settings:")
log.info(s" Trust all CA certificates: $trustAllSslCerts")
log.info(s" Number of retries: $numberOfRetries")
log.info(s" Maximum backoff between retries: $backoffMs ms")

new RetryableHttpClient(new BasicHttpClient(trustAllSslCerts), numberOfRetries, backoffMs)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.extras.utils.httpclient

case class SimpleHttpRequest(url: String,
method: HttpMethod,
headers: Map[String, String] = Map.empty,
body: Option[String] = None,
cacheKey: Option[String] = None, // You can specify a custom cache key so that cache sill work if the URL changes
allowStaleData: Boolean =
false) // If yes, expired data form cache can be used if the primary endpoint is not available
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.extras.utils.httpclient

case class SimpleHttpResponse(statusCode: Int,
body: Option[String],
warnings: Seq[String])
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.extras.utils.httpclient.impl

import org.apache.http.client.methods._
import org.apache.http.config.RegistryBuilder
import org.apache.http.conn.socket.{ConnectionSocketFactory, PlainConnectionSocketFactory}
import org.apache.http.conn.ssl.{NoopHostnameVerifier, SSLConnectionSocketFactory}
import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.{CloseableHttpClient, HttpClients}
import org.apache.http.impl.conn.BasicHttpClientConnectionManager
import org.apache.http.ssl.{SSLContexts, TrustStrategy}
import org.apache.http.util.EntityUtils
import org.slf4j.LoggerFactory
import za.co.absa.pramen.extras.utils.httpclient.HttpMethod.{DELETE, GET, POST, PUT}
import za.co.absa.pramen.extras.utils.httpclient.{SimpleHttpClient, SimpleHttpRequest, SimpleHttpResponse}

import java.security.cert.X509Certificate

class BasicHttpClient(trustAllSslCerts: Boolean) extends SimpleHttpClient {
import BasicHttpClient._

private val log = LoggerFactory.getLogger(this.getClass)
private val httpClient = getHttpClient

override def execute(request: SimpleHttpRequest): SimpleHttpResponse = {
val httpRequest = getApacheHttpRequest(request)

val response = httpClient.execute(httpRequest)
val bodyStr = EntityUtils.toString(response.getEntity)
val body = if (bodyStr.isEmpty) None else Option(bodyStr)

SimpleHttpResponse(
response.getStatusLine.getStatusCode,
body,
Seq.empty
)
}

override def close(): Unit =
httpClient.close()

private[extras] def getHttpClient: CloseableHttpClient =
if (trustAllSslCerts) {
log.warn("Trusting all SSL certificates for the cleanup API.")
val sslsf = getTrustingSocketFactory

val socketFactoryRegistry =
RegistryBuilder
.create[ConnectionSocketFactory]()
.register("https", sslsf)
.register("http", new PlainConnectionSocketFactory())
.build()

val connectionManager = new BasicHttpClientConnectionManager(socketFactoryRegistry)

HttpClients
.custom()
.setSSLSocketFactory(sslsf)
.setConnectionManager(connectionManager)
.build()
} else {
HttpClients.createDefault()
}
}

object BasicHttpClient {
def apply(trustAllSslCerts: Boolean = false): BasicHttpClient =
new BasicHttpClient(trustAllSslCerts)

/**
* Returns a socket factory that trusts all SSL certificates.
*/
private[extras] def getTrustingSocketFactory: SSLConnectionSocketFactory = {
val trustStrategy = new TrustStrategy {
override def isTrusted(x509Certificates: Array[X509Certificate], s: String): Boolean = true
}

val sslContext = SSLContexts.custom.loadTrustMaterial(null, trustStrategy).build
new SSLConnectionSocketFactory(sslContext, NoopHostnameVerifier.INSTANCE)
}

private[extras] def getApacheHttpRequest(request: SimpleHttpRequest): HttpUriRequest = {
val httpRequest = request.method match {
case GET =>
request.body match {
case Some(body) =>
val req = new HttpGetWithBody(request.url)
req.setEntity(new StringEntity(body))
req
case None => new HttpGet(request.url)
}

case POST =>
val req = new HttpPost(request.url)
req.setEntity(new org.apache.http.entity.StringEntity(request.body.getOrElse("")))
req

case PUT =>
val req = new HttpPut(request.url)
req.setEntity(new org.apache.http.entity.StringEntity(request.body.getOrElse("")))
req
case DELETE =>
request.body match {
case Some(body) =>
val req = new HttpDeleteWithBody(request.url)
req.setEntity(new StringEntity(body))
req
case None => new HttpDelete(request.url)
}
case _ => throw new IllegalArgumentException(s"Unsupported HTTP method: ${request.method}")
}

request.headers.foreach { case (k, v) => httpRequest.addHeader(k, v) }

httpRequest
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.extras.utils.httpclient.impl

import org.apache.http.client.methods.HttpEntityEnclosingRequestBase

import java.net.URI

/**
* This class allows a workaround for sending DELETE requests with body.
*
* The implementation idea is based on https://stackoverflow.com/a/43265866/1038282
*/
class HttpDeleteWithBody extends HttpEntityEnclosingRequestBase {
import HttpDeleteWithBody._

def getMethod: String = METHOD_NAME

def this(uri: String) {
this()
setURI(URI.create(uri))
}
}

object HttpDeleteWithBody {
val METHOD_NAME = "DELETE"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.extras.utils.httpclient.impl

import org.apache.http.client.methods.HttpEntityEnclosingRequestBase

import java.net.URI

/**
* This class allows a workaround for sending GET requests with body.
*
* The implementation idea is based on https://stackoverflow.com/a/43265866/1038282
*/
class HttpGetWithBody extends HttpEntityEnclosingRequestBase {
import HttpGetWithBody._

def getMethod: String = METHOD_NAME

def this(uri: String) {
this()
setURI(URI.create(uri))
}
}

object HttpGetWithBody {
val METHOD_NAME = "GET"
}
Loading

0 comments on commit fe34063

Please sign in to comment.