Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

Commit

Permalink
Merge pull request #18 from granduke/eventhubsdk1.0
Browse files Browse the repository at this point in the history
Upgrade azure-eventhubs SDK to v1.0.0
  • Loading branch information
varunpuranik committed Mar 19, 2018
2 parents e8889f8 + a9ca1db commit b4bab2c
Show file tree
Hide file tree
Showing 9 changed files with 49 additions and 28 deletions.
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright (c) Microsoft. All rights reserved.

val iotHubKafkaConnectVersion = "0.6.2"
val iotHubKafkaConnectVersion = "0.7.0"

name := "kafka-connect-iothub"
organization := "com.microsoft.azure.iot"
Expand All @@ -13,7 +13,7 @@ scalacOptions ++= Seq("-deprecation", "-explaintypes", "-unchecked", "-feature")
libraryDependencies ++= {

val kafkaVersion = "0.10.2.1"
val azureEventHubSDKVersion = "0.14.0"
val azureEventHubSDKVersion = "1.0.0"
val scalaLoggingVersion = "3.5.0"
val logbackClassicVersion = "1.1.7"
val scalaTestVersion = "3.0.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
package com.microsoft.azure.iot.kafka.connect.source

import java.time.{Duration, Instant}
import java.util.concurrent.Executors

import com.microsoft.azure.eventhubs.{EventHubClient, PartitionReceiver}
import com.microsoft.azure.eventhubs.{EventHubClient, EventPosition, PartitionReceiver}

import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
Expand All @@ -14,16 +15,19 @@ class EventHubReceiver(val connectionString: String, val receiverConsumerGroup:

private[this] var isClosing = false

private val eventHubClient = EventHubClient.createFromConnectionStringSync(connectionString)
private val executorService = Executors.newSingleThreadExecutor()
private val eventHubClient = EventHubClient.createSync(connectionString, executorService)
if (eventHubClient == null) {
throw new IllegalArgumentException("Unable to create EventHubClient from the input parameters.")
}

private val eventHubReceiver: PartitionReceiver = if (startTime.isDefined) {
eventHubClient.createReceiverSync(receiverConsumerGroup, partition.toString, startTime.get)
} else {
eventHubClient.createReceiverSync(receiverConsumerGroup, partition.toString, offset.get)
private val eventPosition = if (startTime.isDefined) {
EventPosition.fromEnqueuedTime(startTime.get)
} else {
EventPosition.fromOffset(offset.get)
}
private val eventHubReceiver: PartitionReceiver = eventHubClient.createReceiverSync(
receiverConsumerGroup, partition.toString, eventPosition)
if (this.eventHubReceiver == null) {
throw new IllegalArgumentException("Unable to create PartitionReceiver from the input parameters.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,6 @@ object IotHubSourceConfig {
def getConfig(configValues: Map[String, String]): IotHubSourceConfig = {
new IotHubSourceConfig(configDef, configValues)
}

def getEventHubCompatibleNamespace(eventHubCompatibleEndpoint: String): String = {
eventHubCompatibleEndpoint.replaceFirst(".*://", "").replaceFirst("\\..*", "")
}
}

class IotHubSourceConfig(configDef: ConfigDef, configValues: Map[String, String])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@

package com.microsoft.azure.iot.kafka.connect.source

import java.net.URI
import java.util

import com.microsoft.azure.eventhubs.PartitionReceiver
import com.microsoft.azure.servicebus.ConnectionStringBuilder
import com.microsoft.azure.eventhubs.ConnectionStringBuilder
import com.microsoft.azure.eventhubs.impl.ClientConstants
import com.typesafe.scalalogging.LazyLogging
import org.apache.kafka.common.config.{ConfigDef, ConfigException}
import org.apache.kafka.connect.connector.Task
Expand Down Expand Up @@ -44,7 +45,7 @@ class IotHubSourceConnector extends SourceConnector with LazyLogging with JsonSe
offsets(partition)
}
else {
PartitionReceiver.START_OF_STREAM
ClientConstants.START_OF_STREAM
}
partitionOffsetsMap += (partition.toString -> partitionOffset)
partition = partition + maxTasks
Expand Down Expand Up @@ -80,14 +81,11 @@ class IotHubSourceConnector extends SourceConnector with LazyLogging with JsonSe
}

val iotHubSourceConfig = iotHubSourceConfigOption.get
val eventHubCompatibleNamespace = IotHubSourceConfig.getEventHubCompatibleNamespace(
iotHubSourceConfig.getString(IotHubSourceConfig.EventHubCompatibleEndpoint))
val iotHubConnectionString = new ConnectionStringBuilder(
eventHubCompatibleNamespace,
iotHubSourceConfig.getString(IotHubSourceConfig.EventHubCompatibleName),
iotHubSourceConfig.getString(IotHubSourceConfig.IotHubAccessKeyName),
iotHubSourceConfig.getString(IotHubSourceConfig.IotHubAccessKeyValue)).toString

val iotHubConnectionString = new ConnectionStringBuilder()
.setEndpoint(new URI(iotHubSourceConfig.getString(IotHubSourceConfig.EventHubCompatibleEndpoint)))
.setEventHubName(iotHubSourceConfig.getString(IotHubSourceConfig.EventHubCompatibleName))
.setSasKeyName(iotHubSourceConfig.getString(IotHubSourceConfig.IotHubAccessKeyName))
.setSasKey(iotHubSourceConfig.getString(IotHubSourceConfig.IotHubAccessKeyValue)).toString
this.props = Map[String, String](
IotHubSourceConfig.EventHubCompatibleConnectionString -> iotHubConnectionString,
IotHubSourceConfig.IotHubOffset -> iotHubSourceConfig.getString(IotHubSourceConfig.IotHubOffset),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ package com.microsoft.azure.iot.kafka.connect.source
import java.time.Instant
import java.util.Date

import com.microsoft.azure.servicebus.amqp.AmqpConstants
import com.microsoft.azure.eventhubs.impl.AmqpConstants
import org.apache.kafka.connect.data.{Schema, SchemaBuilder, Struct}

import scala.collection.JavaConverters._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,22 @@ class IotHubSourceConnectorTest extends FlatSpec with GivenWhenThen with JsonSer
connector.start(inputProperties)
}
}

"IotHubSourceConnector" should "create a valid connection string" in {
Given("Valid set of input properties")
val inputProperties = TestConfig.sourceConnectorTestProps
val connector = new IotHubSourceConnector

When("Start and TaskConfig are called in right order")
connector.start(inputProperties)
val taskConfig = connector.taskConfigs(1).get(0)

Then("The TaskConfig has the expected connectionString")
val expected = String.format("Endpoint=%s;EntityPath=%s;SharedAccessKeyName=%s;SharedAccessKey=%s",
inputProperties.get(IotHubSourceConfig.EventHubCompatibleEndpoint),
inputProperties.get(IotHubSourceConfig.EventHubCompatibleName),
inputProperties.get(IotHubSourceConfig.IotHubAccessKeyName),
inputProperties.get(IotHubSourceConfig.IotHubAccessKeyValue))
assert(taskConfig.get(IotHubSourceConfig.EventHubCompatibleConnectionString) == expected)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ package com.microsoft.azure.iot.kafka.connect.source
import java.text.SimpleDateFormat
import java.time.Instant

import com.microsoft.azure.eventhubs.impl.AmqpConstants
import com.microsoft.azure.iot.kafka.connect.source.testhelpers.DeviceTemperature
import com.microsoft.azure.servicebus.amqp.AmqpConstants
import org.apache.kafka.connect.data.Struct
import org.json4s.jackson.Serialization._
import org.scalatest.{FlatSpec, GivenWhenThen}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ package com.microsoft.azure.iot.kafka.connect.source.testhelpers
import java.text.SimpleDateFormat
import java.time.{Duration, Instant}

import com.microsoft.azure.eventhubs.impl.AmqpConstants
import com.microsoft.azure.iot.kafka.connect.source.{DataReceiver, IotMessage, JsonSerialization}
import com.microsoft.azure.servicebus.amqp.AmqpConstants
import org.json4s.jackson.Serialization.write

import scala.collection.mutable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@

package com.microsoft.azure.iot.kafka.connect.source.testhelpers

import java.net.URI
import java.util

import com.microsoft.azure.eventhubs.ConnectionStringBuilder
import com.microsoft.azure.iot.kafka.connect.source.IotHubSourceConfig
import com.microsoft.azure.servicebus.ConnectionStringBuilder
import com.typesafe.config.ConfigFactory

object TestConfig {
Expand Down Expand Up @@ -88,5 +89,9 @@ object TestConfig {
lazy private val iotHubKeyName = iotHubConfig.getString("keyName")
lazy private val iotHubKeyValue = iotHubConfig.getString("key")
lazy private val iotHubPartitions = iotHubConfig.getInt("partitions")
lazy private val connStr = new ConnectionStringBuilder(iotHubEndpoint, iotHubName, iotHubKeyName, iotHubKeyValue)
lazy private val connStr = new ConnectionStringBuilder()
.setEndpoint(new URI(iotHubEndpoint))
.setEventHubName(iotHubName)
.setSasKeyName(iotHubKeyName)
.setSasKey(iotHubKeyValue)
}

0 comments on commit b4bab2c

Please sign in to comment.