From 7898d67a31d77820f6331d0ce91872baadfee9ef Mon Sep 17 00:00:00 2001 From: Lin Zhou <87341375+linzhou-db@users.noreply.github.com> Date: Mon, 22 Apr 2024 17:25:41 -0700 Subject: [PATCH 1/2] Add queryID in userAgent for spark delta sharing queries --- .../io/delta/sharing/spark/DeltaSharingClient.scala | 13 +++++++++++-- .../io/delta/sharing/spark/DeltaSharingSource.scala | 5 ++++- .../sharing/spark/DeltaSharingRestClientSuite.scala | 13 +++++++++---- 3 files changed, 24 insertions(+), 7 deletions(-) diff --git a/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingClient.scala b/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingClient.scala index 149e18335..ee9ffa6fa 100644 --- a/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingClient.scala +++ b/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingClient.scala @@ -22,6 +22,7 @@ import java.nio.charset.StandardCharsets.UTF_8 import java.sql.Timestamp import java.time.LocalDateTime import java.time.format.DateTimeFormatter.ISO_DATE_TIME +import java.util.UUID import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, ListBuffer} @@ -104,9 +105,12 @@ private[spark] class DeltaSharingRestClient( sslTrustAll: Boolean = false, forStreaming: Boolean = false ) extends DeltaSharingClient with Logging { + import DeltaSharingRestClient._ @volatile private var created = false + private var queryId: Option[String] = None + private lazy val client = { val clientBuilder: HttpClientBuilder = if (sslTrustAll) { val sslBuilder = new SSLContextBuilder() @@ -480,6 +484,8 @@ private[spark] class DeltaSharingRestClient( allowNoContent: Boolean = false, fetchAsOneString: Boolean = false ): (Option[Long], Seq[String]) = { + // Reset queryId before calling RetryUtils, and before prepareHeaders. + queryId = Some(UUID.randomUUID().toString().split('-').head) RetryUtils.runWithExponentialBackoff(numRetries, maxRetryDuration) { val profile = profileProvider.getProfile val response = client.execute( @@ -550,7 +556,11 @@ private[spark] class DeltaSharingRestClient( } else { "Delta-Sharing-Spark" } - s"$sparkAgent/$VERSION" + DeltaSharingRestClient.USER_AGENT + s"$sparkAgent/$VERSION" + s" $sparkVersionString" + s" $getQueryIdString" + USER_AGENT + } + + private def getQueryIdString: String = { + s"QueryId-${queryId.getOrElse("not_set")}" } def close(): Unit = { @@ -571,7 +581,6 @@ private[spark] object DeltaSharingRestClient extends Logging { lazy val USER_AGENT = { try { - s" $sparkVersionString" + s" Hadoop/${VersionInfo.getVersion()}" + s" ${spaceFreeProperty("os.name")}/${spaceFreeProperty("os.version")}" + s" ${spaceFreeProperty("java.vm.name")}/${spaceFreeProperty("java.vm.version")}" + diff --git a/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala b/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala index 153dc4cf7..8078a2424 100644 --- a/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala +++ b/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala @@ -1135,7 +1135,10 @@ case class DeltaSharingSource( } Some(v) } else if (options.startingTimestamp.isDefined) { - Some(deltaLog.client.getTableVersion(deltaLog.table, options.startingTimestamp)) + val version = deltaLog.client.getTableVersion(deltaLog.table, options.startingTimestamp) + logInfo(s"Got table version $version for timestamp ${options.startingTimestamp} " + + s"from Delta Sharing Server.") + Some(version) } else { None } diff --git a/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingRestClientSuite.scala b/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingRestClientSuite.scala index bde1ea38f..7de2659d2 100644 --- a/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingRestClientSuite.scala +++ b/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingRestClientSuite.scala @@ -41,12 +41,17 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest { val httpRequest = new HttpGet("random_url") val client = new DeltaSharingRestClient(testProfileProvider, forStreaming = false) - var h = client.prepareHeaders(httpRequest).getFirstHeader(HttpHeaders.USER_AGENT) - assert(!h.getValue.contains(DeltaSharingRestClient.SPARK_STRUCTURED_STREAMING)) + var h = client.prepareHeaders(httpRequest).getFirstHeader(HttpHeaders.USER_AGENT).getValue + assert(!h.contains(DeltaSharingRestClient.SPARK_STRUCTURED_STREAMING)) + assert(h.contains("Delta-Sharing-Spark")) + assert(h.contains(" QueryId-")) + assert(h.contains(" Hadoop/")) + assert(h.contains(" Linux/")) + assert(h.contains(" java/")) val streamingClient = new DeltaSharingRestClient(testProfileProvider, forStreaming = true) - h = streamingClient.prepareHeaders(httpRequest).getFirstHeader(HttpHeaders.USER_AGENT) - assert(h.getValue.contains(DeltaSharingRestClient.SPARK_STRUCTURED_STREAMING)) + h = streamingClient.prepareHeaders(httpRequest).getFirstHeader(HttpHeaders.USER_AGENT).getValue + assert(h.contains(DeltaSharingRestClient.SPARK_STRUCTURED_STREAMING)) } integrationTest("listAllTables") { From 06236812d9b92efd9871786c3248dcd249631dc8 Mon Sep 17 00:00:00 2001 From: Lin Zhou Date: Mon, 22 Apr 2024 21:22:48 -0700 Subject: [PATCH 2/2] add log --- .../main/scala/io/delta/sharing/spark/DeltaSharingSource.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala b/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala index 8078a2424..5e9f64b31 100644 --- a/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala +++ b/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala @@ -184,6 +184,7 @@ case class DeltaSharingSource( if (lastGetVersionTimestamp == -1 || (currentTimeMillis - lastGetVersionTimestamp) >= QUERY_TABLE_VERSION_INTERVAL_MILLIS) { val serverVersion = deltaLog.client.getTableVersion(deltaLog.table) + logInfo(s"Got table version $serverVersion from Delta Sharing Server.") if (serverVersion < 0) { throw new IllegalStateException(s"Delta Sharing Server returning negative table version:" + s"$serverVersion.")