Skip to content

Commit

Permalink
Dynamic added partitions (#544)
Browse files Browse the repository at this point in the history
  • Loading branch information
nyaghma committed Oct 27, 2020
1 parent 31e9f70 commit 0b3b8b2
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 14 deletions.
49 changes: 38 additions & 11 deletions core/src/main/scala/org/apache/spark/eventhubs/EventHubsConf.scala
Expand Up @@ -23,7 +23,11 @@ import java.util.concurrent.ConcurrentHashMap
import com.microsoft.azure.eventhubs.AzureActiveDirectoryTokenProvider.AuthenticationCallback
import org.apache.spark.eventhubs.PartitionPreferredLocationStrategy.PartitionPreferredLocationStrategy
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.eventhubs.utils.{AadAuthenticationCallback, MetricPlugin, ThrottlingStatusPlugin}
import org.apache.spark.eventhubs.utils.{
AadAuthenticationCallback,
MetricPlugin,
ThrottlingStatusPlugin
}
import org.apache.spark.internal.Logging
import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization
Expand Down Expand Up @@ -172,7 +176,8 @@ final class EventHubsConf private (private val connectionStr: String)
ThrottlingStatusPluginKey,
MaxAcceptableBatchReceiveTimeKey,
UseAadAuthKey,
AadAuthCallbackKey
AadAuthCallbackKey,
DynamicPartitionDiscoveryKey
).map(_.toLowerCase).toSet

val trimmedConfig = EventHubsConf(connectionString)
Expand Down Expand Up @@ -496,10 +501,31 @@ final class EventHubsConf private (private val connectionStr: String)
self.get(SlowPartitionAdjustmentKey).getOrElse(DefaultSlowPartitionAdjustment).toBoolean
}

/**
* Set the flag for dynamic partition discovery. This option is useful only if partitions are being dynamically
* added to an existing event hub. For more information on how to dynamically add partitions to an event hub
* please refer to [[https://docs.microsoft.com/en-us/azure/event-hubs/dynamically-add-partitions]].
* If dynamic partition discovery is disabled the number of partitions is being read and set at the
* beginning of the execution. Otherwise the number of partitions is being read and updated every
* [[UpdatePartitionCountIntervalMS]] milliseconds. The default value is false.
* Default: [[DefaultDynamicPartitionDiscovery]]
*
* @param b the flag which specifies whether the connector uses dynamic partition discovery
* @return the updated [[EventHubsConf]] instance
*/
def setDynamicPartitionDiscovery(b: Boolean): EventHubsConf = {
set(DynamicPartitionDiscoveryKey, b)
}

/** The dynamic partition discovery flag */
def dynamicPartitionDiscovery: Boolean = {
self.get(DynamicPartitionDiscoveryKey).getOrElse(DefaultDynamicPartitionDiscovery).toBoolean
}

/** Set the max time that is acceptable for a partition to receive events in a single batch.
* This value is being used to identify slow partitions when the slowPartitionAdjustment is on.
* Only partitions that tale more than this time to receive thier portion of events in batch are considered
* as potential slow partitrions.
* Only partitions that tale more than this time to receive their portion of events in batch are considered
* as potential slow partitions.
* Default: [[DefaultMaxAcceptableBatchReceiveTime]]
*
* @param d the new maximum acceptable time for a partition to receive events in a single batch
Expand Down Expand Up @@ -580,13 +606,13 @@ final class EventHubsConf private (private val connectionStr: String)
}

/**
* set a callback class for aad auth. The class should be Serializable and derived from
* org.apache.spark.eventhubs.utils.AadAuthenticationCallback.
* More info about this: https://docs.microsoft.com/en-us/azure/event-hubs/authorize-access-azure-active-directory
*
* @param callback The callback class which implements org.apache.spark.eventhubs.utils.AadAuthenticationCallback
* @return the updated [[EventHubsConf]] instance
*/
* set a callback class for aad auth. The class should be Serializable and derived from
* org.apache.spark.eventhubs.utils.AadAuthenticationCallback.
* More info about this: https://docs.microsoft.com/en-us/azure/event-hubs/authorize-access-azure-active-directory
*
* @param callback The callback class which implements org.apache.spark.eventhubs.utils.AadAuthenticationCallback
* @return the updated [[EventHubsConf]] instance
*/
def setAadAuthCallback(callback: AadAuthenticationCallback): EventHubsConf = {
setUseAadAuth(true)
set(AadAuthCallbackKey, callback.getClass.getName)
Expand Down Expand Up @@ -657,6 +683,7 @@ object EventHubsConf extends Logging {
val MaxAcceptableBatchReceiveTimeKey = "eventhubs.maxAcceptableBatchReceiveTime"
val UseAadAuthKey = "eventhubs.useAadAuth"
val AadAuthCallbackKey = "eventhubs.aadAuthCallback"
val DynamicPartitionDiscoveryKey = "eventhubs.DynamicPartitionDiscovery"

/** Creates an EventHubsConf */
def apply(connectionString: String) = new EventHubsConf(connectionString)
Expand Down
Expand Up @@ -44,6 +44,7 @@ private[spark] class EventHubsClient(private val ehConf: EventHubsConf)
with Logging {

import org.apache.spark.eventhubs._
import EventHubsClient._

ehConf.validate

Expand Down Expand Up @@ -164,15 +165,45 @@ private[spark] class EventHubsClient(private val ehConf: EventHubsConf)
*
* @return partition count
*/
override lazy val partitionCount: Int = {
override def partitionCount: Int = {
try {
if (ehConf.dynamicPartitionDiscovery) {
partitionCountDynamic
} else {
partitionCountLazyVal
}
} catch {
case e: Exception => throw e
}
}

lazy val partitionCountLazyVal: Int = {
try {
logDebug(
s"partitionCountLazyVal makes a call to runTimeInfo to read the number of partitions.")
val runtimeInfo = client.getRuntimeInformation.get
runtimeInfo.getPartitionCount
} catch {
case e: Exception => throw e
}
}

def partitionCountDynamic: Int = {
try {
val currentTimeStamp = System.currentTimeMillis()
if ((currentTimeStamp - partitionCountCacheUpdateTimestamp > UpdatePartitionCountIntervalMS) || (partitionCountCache == 0)) {
val runtimeInfo = client.getRuntimeInformation.get
partitionCountCache = runtimeInfo.getPartitionCount
partitionCountCacheUpdateTimestamp = currentTimeStamp
logDebug(
s"partitionCountDynamic made a call to runTimeInfo to read the number of partitions = ${partitionCountCache} at timestamp = ${partitionCountCacheUpdateTimestamp}")
}
partitionCountCache
} catch {
case e: Exception => throw e
}
}

/**
* Cleans up all open connections and links.
*
Expand Down Expand Up @@ -318,6 +349,8 @@ private[spark] class EventHubsClient(private val ehConf: EventHubsConf)
}

private[spark] object EventHubsClient {
private var partitionCountCache: Int = 0
private var partitionCountCacheUpdateTimestamp: Long = 0

private[spark] def apply(ehConf: EventHubsConf): EventHubsClient =
new EventHubsClient(ehConf)
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/eventhubs/package.scala
Expand Up @@ -50,11 +50,13 @@ package object eventhubs {
val DefaultPartitionPreferredLocationStrategy = "Hash"
val DefaultUseExclusiveReceiver = "true"
val DefaultSlowPartitionAdjustment = "false"
val DefaultDynamicPartitionDiscovery = "false"
val StartingSequenceNumber = 0L
val DefaultThreadPoolSize = 16
val DefaultEpoch = 0L
val RetryCount = 10
val WaitInterval = 5000
val UpdatePartitionCountIntervalMS = 300000

val OffsetAnnotation = "x-opt-offset"
val EnqueuedTimeAnnotation = "x-opt-enqueued-time"
Expand Down
Expand Up @@ -78,7 +78,7 @@ private[spark] class EventHubsSource private[eventhubs] (sqlContext: SQLContext,
import EventHubsSourceProvider._

private lazy val ehClient = EventHubsSourceProvider.clientFactory(parameters)(ehConf)
private lazy val partitionCount: Int = ehClient.partitionCount
private def partitionCount: Int = ehClient.partitionCount

private val ehConf = EventHubsConf.toConf(parameters)
private val ehName = ehConf.name
Expand Down Expand Up @@ -178,6 +178,10 @@ private[spark] class EventHubsSource private[eventhubs] (sqlContext: SQLContext,

val seqNos = metadataLog.get(0) match {
case Some(checkpoint) =>
if (defaultSeqNos.size > checkpoint.partitionToSeqNos.size) {
logInfo(
s"Number of partitions has increased from ${checkpoint.partitionToSeqNos.size} in the latest checkpoint to ${defaultSeqNos.size}.")
}
defaultSeqNos ++ checkpoint.partitionToSeqNos
case None =>
defaultSeqNos
Expand Down Expand Up @@ -344,6 +348,8 @@ private[spark] class EventHubsSource private[eventhubs] (sqlContext: SQLContext,
case Some(prevBatchEndOffset) =>
val prevOffsets = EventHubsSourceOffset.getPartitionSeqNos(prevBatchEndOffset)
val startingSeqNos = if (prevOffsets.size < untilSeqNos.size) {
logInfo(
s"Number of partitions has increased from ${prevOffsets.size} to ${untilSeqNos.size}")
val defaultSeqNos = ehClient
.translate(ehConf, partitionCount)
.map {
Expand Down
Expand Up @@ -55,7 +55,7 @@ private[spark] class EventHubsDirectDStream private[spark] (_ssc: StreamingConte

import EventHubsDirectDStream._

private lazy val partitionCount: Int = ehClient.partitionCount
private def partitionCount: Int = ehClient.partitionCount
private lazy val ehName = ehConf.name

@transient private var _client: Client = _
Expand Down

0 comments on commit 0b3b8b2

Please sign in to comment.