From 0b3b8b2db770d58c854fad6f63d4c04141030c60 Mon Sep 17 00:00:00 2001 From: nyaghma Date: Mon, 26 Oct 2020 18:01:36 -0700 Subject: [PATCH] Dynamic added partitions (#544) --- .../spark/eventhubs/EventHubsConf.scala | 49 ++++++++++++++----- .../eventhubs/client/EventHubsClient.scala | 35 ++++++++++++- .../org/apache/spark/eventhubs/package.scala | 2 + .../spark/sql/eventhubs/EventHubsSource.scala | 8 ++- .../eventhubs/EventHubsDirectDStream.scala | 2 +- 5 files changed, 82 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/eventhubs/EventHubsConf.scala b/core/src/main/scala/org/apache/spark/eventhubs/EventHubsConf.scala index 03398e2f9..63cde7751 100644 --- a/core/src/main/scala/org/apache/spark/eventhubs/EventHubsConf.scala +++ b/core/src/main/scala/org/apache/spark/eventhubs/EventHubsConf.scala @@ -23,7 +23,11 @@ import java.util.concurrent.ConcurrentHashMap import com.microsoft.azure.eventhubs.AzureActiveDirectoryTokenProvider.AuthenticationCallback import org.apache.spark.eventhubs.PartitionPreferredLocationStrategy.PartitionPreferredLocationStrategy import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap -import org.apache.spark.eventhubs.utils.{AadAuthenticationCallback, MetricPlugin, ThrottlingStatusPlugin} +import org.apache.spark.eventhubs.utils.{ + AadAuthenticationCallback, + MetricPlugin, + ThrottlingStatusPlugin +} import org.apache.spark.internal.Logging import org.json4s.NoTypeHints import org.json4s.jackson.Serialization @@ -172,7 +176,8 @@ final class EventHubsConf private (private val connectionStr: String) ThrottlingStatusPluginKey, MaxAcceptableBatchReceiveTimeKey, UseAadAuthKey, - AadAuthCallbackKey + AadAuthCallbackKey, + DynamicPartitionDiscoveryKey ).map(_.toLowerCase).toSet val trimmedConfig = EventHubsConf(connectionString) @@ -496,10 +501,31 @@ final class EventHubsConf private (private val connectionStr: String) self.get(SlowPartitionAdjustmentKey).getOrElse(DefaultSlowPartitionAdjustment).toBoolean } + /** + * Set the flag for dynamic partition discovery. This option is useful only if partitions are being dynamically + * added to an existing event hub. For more information on how to dynamically add partitions to an event hub + * please refer to [[https://docs.microsoft.com/en-us/azure/event-hubs/dynamically-add-partitions]]. + * If dynamic partition discovery is disabled the number of partitions is being read and set at the + * beginning of the execution. Otherwise the number of partitions is being read and updated every + * [[UpdatePartitionCountIntervalMS]] milliseconds. The default value is false. + * Default: [[DefaultDynamicPartitionDiscovery]] + * + * @param b the flag which specifies whether the connector uses dynamic partition discovery + * @return the updated [[EventHubsConf]] instance + */ + def setDynamicPartitionDiscovery(b: Boolean): EventHubsConf = { + set(DynamicPartitionDiscoveryKey, b) + } + + /** The dynamic partition discovery flag */ + def dynamicPartitionDiscovery: Boolean = { + self.get(DynamicPartitionDiscoveryKey).getOrElse(DefaultDynamicPartitionDiscovery).toBoolean + } + /** Set the max time that is acceptable for a partition to receive events in a single batch. * This value is being used to identify slow partitions when the slowPartitionAdjustment is on. - * Only partitions that tale more than this time to receive thier portion of events in batch are considered - * as potential slow partitrions. + * Only partitions that tale more than this time to receive their portion of events in batch are considered + * as potential slow partitions. * Default: [[DefaultMaxAcceptableBatchReceiveTime]] * * @param d the new maximum acceptable time for a partition to receive events in a single batch @@ -580,13 +606,13 @@ final class EventHubsConf private (private val connectionStr: String) } /** - * set a callback class for aad auth. The class should be Serializable and derived from - * org.apache.spark.eventhubs.utils.AadAuthenticationCallback. - * More info about this: https://docs.microsoft.com/en-us/azure/event-hubs/authorize-access-azure-active-directory - * - * @param callback The callback class which implements org.apache.spark.eventhubs.utils.AadAuthenticationCallback - * @return the updated [[EventHubsConf]] instance - */ + * set a callback class for aad auth. The class should be Serializable and derived from + * org.apache.spark.eventhubs.utils.AadAuthenticationCallback. + * More info about this: https://docs.microsoft.com/en-us/azure/event-hubs/authorize-access-azure-active-directory + * + * @param callback The callback class which implements org.apache.spark.eventhubs.utils.AadAuthenticationCallback + * @return the updated [[EventHubsConf]] instance + */ def setAadAuthCallback(callback: AadAuthenticationCallback): EventHubsConf = { setUseAadAuth(true) set(AadAuthCallbackKey, callback.getClass.getName) @@ -657,6 +683,7 @@ object EventHubsConf extends Logging { val MaxAcceptableBatchReceiveTimeKey = "eventhubs.maxAcceptableBatchReceiveTime" val UseAadAuthKey = "eventhubs.useAadAuth" val AadAuthCallbackKey = "eventhubs.aadAuthCallback" + val DynamicPartitionDiscoveryKey = "eventhubs.DynamicPartitionDiscovery" /** Creates an EventHubsConf */ def apply(connectionString: String) = new EventHubsConf(connectionString) diff --git a/core/src/main/scala/org/apache/spark/eventhubs/client/EventHubsClient.scala b/core/src/main/scala/org/apache/spark/eventhubs/client/EventHubsClient.scala index 48fcb99b2..f63d0af37 100644 --- a/core/src/main/scala/org/apache/spark/eventhubs/client/EventHubsClient.scala +++ b/core/src/main/scala/org/apache/spark/eventhubs/client/EventHubsClient.scala @@ -44,6 +44,7 @@ private[spark] class EventHubsClient(private val ehConf: EventHubsConf) with Logging { import org.apache.spark.eventhubs._ + import EventHubsClient._ ehConf.validate @@ -164,8 +165,22 @@ private[spark] class EventHubsClient(private val ehConf: EventHubsConf) * * @return partition count */ - override lazy val partitionCount: Int = { + override def partitionCount: Int = { try { + if (ehConf.dynamicPartitionDiscovery) { + partitionCountDynamic + } else { + partitionCountLazyVal + } + } catch { + case e: Exception => throw e + } + } + + lazy val partitionCountLazyVal: Int = { + try { + logDebug( + s"partitionCountLazyVal makes a call to runTimeInfo to read the number of partitions.") val runtimeInfo = client.getRuntimeInformation.get runtimeInfo.getPartitionCount } catch { @@ -173,6 +188,22 @@ private[spark] class EventHubsClient(private val ehConf: EventHubsConf) } } + def partitionCountDynamic: Int = { + try { + val currentTimeStamp = System.currentTimeMillis() + if ((currentTimeStamp - partitionCountCacheUpdateTimestamp > UpdatePartitionCountIntervalMS) || (partitionCountCache == 0)) { + val runtimeInfo = client.getRuntimeInformation.get + partitionCountCache = runtimeInfo.getPartitionCount + partitionCountCacheUpdateTimestamp = currentTimeStamp + logDebug( + s"partitionCountDynamic made a call to runTimeInfo to read the number of partitions = ${partitionCountCache} at timestamp = ${partitionCountCacheUpdateTimestamp}") + } + partitionCountCache + } catch { + case e: Exception => throw e + } + } + /** * Cleans up all open connections and links. * @@ -318,6 +349,8 @@ private[spark] class EventHubsClient(private val ehConf: EventHubsConf) } private[spark] object EventHubsClient { + private var partitionCountCache: Int = 0 + private var partitionCountCacheUpdateTimestamp: Long = 0 private[spark] def apply(ehConf: EventHubsConf): EventHubsClient = new EventHubsClient(ehConf) diff --git a/core/src/main/scala/org/apache/spark/eventhubs/package.scala b/core/src/main/scala/org/apache/spark/eventhubs/package.scala index daefcfabf..521feb882 100644 --- a/core/src/main/scala/org/apache/spark/eventhubs/package.scala +++ b/core/src/main/scala/org/apache/spark/eventhubs/package.scala @@ -50,11 +50,13 @@ package object eventhubs { val DefaultPartitionPreferredLocationStrategy = "Hash" val DefaultUseExclusiveReceiver = "true" val DefaultSlowPartitionAdjustment = "false" + val DefaultDynamicPartitionDiscovery = "false" val StartingSequenceNumber = 0L val DefaultThreadPoolSize = 16 val DefaultEpoch = 0L val RetryCount = 10 val WaitInterval = 5000 + val UpdatePartitionCountIntervalMS = 300000 val OffsetAnnotation = "x-opt-offset" val EnqueuedTimeAnnotation = "x-opt-enqueued-time" diff --git a/core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsSource.scala b/core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsSource.scala index f871605e0..be03e120f 100644 --- a/core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsSource.scala +++ b/core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsSource.scala @@ -78,7 +78,7 @@ private[spark] class EventHubsSource private[eventhubs] (sqlContext: SQLContext, import EventHubsSourceProvider._ private lazy val ehClient = EventHubsSourceProvider.clientFactory(parameters)(ehConf) - private lazy val partitionCount: Int = ehClient.partitionCount + private def partitionCount: Int = ehClient.partitionCount private val ehConf = EventHubsConf.toConf(parameters) private val ehName = ehConf.name @@ -178,6 +178,10 @@ private[spark] class EventHubsSource private[eventhubs] (sqlContext: SQLContext, val seqNos = metadataLog.get(0) match { case Some(checkpoint) => + if (defaultSeqNos.size > checkpoint.partitionToSeqNos.size) { + logInfo( + s"Number of partitions has increased from ${checkpoint.partitionToSeqNos.size} in the latest checkpoint to ${defaultSeqNos.size}.") + } defaultSeqNos ++ checkpoint.partitionToSeqNos case None => defaultSeqNos @@ -344,6 +348,8 @@ private[spark] class EventHubsSource private[eventhubs] (sqlContext: SQLContext, case Some(prevBatchEndOffset) => val prevOffsets = EventHubsSourceOffset.getPartitionSeqNos(prevBatchEndOffset) val startingSeqNos = if (prevOffsets.size < untilSeqNos.size) { + logInfo( + s"Number of partitions has increased from ${prevOffsets.size} to ${untilSeqNos.size}") val defaultSeqNos = ehClient .translate(ehConf, partitionCount) .map { diff --git a/core/src/main/scala/org/apache/spark/streaming/eventhubs/EventHubsDirectDStream.scala b/core/src/main/scala/org/apache/spark/streaming/eventhubs/EventHubsDirectDStream.scala index ccfa91f86..8b0208258 100644 --- a/core/src/main/scala/org/apache/spark/streaming/eventhubs/EventHubsDirectDStream.scala +++ b/core/src/main/scala/org/apache/spark/streaming/eventhubs/EventHubsDirectDStream.scala @@ -55,7 +55,7 @@ private[spark] class EventHubsDirectDStream private[spark] (_ssc: StreamingConte import EventHubsDirectDStream._ - private lazy val partitionCount: Int = ehClient.partitionCount + private def partitionCount: Int = ehClient.partitionCount private lazy val ehName = ehConf.name @transient private var _client: Client = _