diff --git a/.gitignore b/.gitignore index 206dbdf..4ed59c9 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ tools/devices-simulator/credentials.js *.crt .ensime +_dev ### MacOS ### .DS_Store diff --git a/.travis.yml b/.travis.yml index 54c0623..e3a3ed3 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,14 +2,16 @@ jdk: oraclejdk8 language: scala scala: - 2.11.8 - - 2.12.0 + - 2.12.1 cache: directories: - "$HOME/.ivy2" - "$HOME/.sbt" - "$HOME/.m2" -notifications: - slack: - secure: S6pcmclrj9vaqHOFMrjgYkF6wXrYF6nB5joYY0rqAwsmTLf7crXRVKZ8txlatpxMHc20Rbw8RQDM6tTka9wwBkHZZfErrcPsS84d5MU9siEkIY42/bAQwuYhxkcgilttgFmSwzLodE72giC/VMhIYCSOyOXIxuR0VtBiPD9Inm9QZ35dZDx3P3nbnaOC4fk+BjdbrX1LB8YL9z5Gy/9TqI90w0FV85XMef75EnSgpqeMD/GMB5hIg+arWVnC2S6hZ91PPCcxCTKBYDjwqUac8mFW/sMFT/yrb2c0NE6ZQqa3dlx/XFyC1X6+7DjJli2Y8OU+FPjY1tQC8JxgVFTbddIgCdUM/5be4uHN/KNs/yF7w1g06ZXK4jhJxxpL4zWINlqDrDmLaqhAtPQkc2CqL3g8MCwYxBbxZY4aFyPfZD7YLdQXDzJZNcfXn9RQQh5y+/zexbGc1zZ/XUo5bK3VbElSs+o2ErI+Sze0FaiK8fW+QeitBdGvjMY7YVKi0Zzf5Dxx1wwxiHR1PQ1r0hA8YZQxwwdpa5lWLFlSVu2w+upPtXqfINMeFktQPbOs1JWIvUvLV0A38dS6R/DsM/W1a3OEVbHQ0Z6OV1nffDnGYPLUl5kRDPFuYYugmCpQHW73lqJdiM0O+Ote4eOQniL1rcajtt+V5cn1/JRWzdJ4PH0= before_install: - if [ "$TRAVIS_PULL_REQUEST" = "false" ]; then openssl aes-256-cbc -K $encrypted_cbef0ff679f7_key -iv $encrypted_cbef0ff679f7_iv -in devices.json.enc -out src/test/resources/devices.json -d ; fi +notifications: + slack: + rooms: + - secure: VW5Pw7NsQw8Uv9Zncn7TEy6vEoudygJDTdyMBjcfpsbibHVjOeDMMw/PKGrr2Y/W6b26LoIaz7uENynwZ9IXML+edIsJB4Kek7cS8YUQn4sT2UAobo8gSgia5aOVd39O48fUdikzJyscVPhyAN0eLiyUkFdDSfuZTonk1qDJEAeW6lbkepPyczoPtXio+K6jPoU+43eW+qA5ZYup+e5Hgo0TSOCaaxF0gDgD+duBVcYEXKnPnCxGhVi6DE8rzBBmcZenUHWFXoZLWVDILMcbjTDcTbx2UvQQ6EMHhyTMkK2v8D0qw3h5H9RtvGCx6OUbOi6ko8giYuC/tADm5TSksTHJOzTyGVcnw214smNBa4kPEwA0f0ayESRXsxwylI7P39IKMyDgGeDkfZdoPr2uaMl0rB98ubUlbllm7o+QtjUegsG2Zf91wL1wxTTXyDldgtot+5IEHyZcFke8zyJooj1Pjk4vasEBDPtDqiqiLOBc8a8tF6TccNb/dfCK3xh2sLVlVU95hrc6tw/ZJk9gZ0U9dDwPYixPTO8X8530UAo4E9s3epMzkdMahcAWFPGpd8tGO3IqegZps4d2p75aoRodz8JO54HRLHX3Gz+EsNDfgHeVqFcA6+rW+P04R2p8fJD1sWpDbq1xT4nQQ6L4TAt8UObkVET22T0BNaOdwqo= + - secure: evAZ40O+fXFmKAmfqnaLjLhELHTXL/aKOlMfDlStfVM35V+JMCivvQt/V+8ObmmkHBI6y9XoeILZzDD1cfCPjUzPAmHpnL6cT9kavu+OhS8BnyGvOcp1BysHIH1DHJ/JpYAKSXkMR2PoGakUp9kvETO+5s5ZKp909vtZqG4oShyqNz8jO6BYKwJrcU/ehOIRG2uHza2KtpRREFlwXu+Ca20+2oV74X0GvaqyDgVjEwxwLRc4KUg+/NPy4YWusBg/6Q/TV1HIAktlROA7ZOb9+DUshWZkOvd+F6JVaCzQDlSKCDvZpuddwUi+YveYlKqb8s7R+zzhxc6KPsXMrJ79k8E1uiZiEjRV3HTLxgZnKBYoKuglgKS1R1nQSsiJfVFQJn+dbBO0HxT9f3hy8nI3EpfTpy9DAO+wOnHycRBp1TCIPBEOCwIfBo2UvxqVAsBfIC3WmOF4VhlZxr/U+QCwolvWNjkPRrcKh4CzQEj2VIoAfyJ0vKujytB2gnv6Sp0mbbcK1jEJdNShc/wPdIXwn7oTT/2RnCdiKPivsF66wKcDmL7U/+zvVqRsw2wZrsw97n+j9lvj4YYZUVVXxtEQuXZ4sru5FM4a08lnS+cY5BfomvMkAT7UXKeV++sF7gBUq7DGQ4Xs6aLMZAAFSbPSw5mT349e+ipQyG8jX/iBllk= diff --git a/CHECKPOINTING.md b/CHECKPOINTING.md index ac9f267..9083df5 100644 --- a/CHECKPOINTING.md +++ b/CHECKPOINTING.md @@ -49,7 +49,7 @@ val start = java.time.Instant.now() val withCheckpoints = false IoTHub().source(start, withCheckpoints) - .map(m => jsonParser.readValue(m.contentAsString, classOf[Temperature])) + .map(m ⇒ jsonParser.readValue(m.contentAsString, classOf[Temperature])) .filter(_.value > 100) .to(console) .run() diff --git a/README.md b/README.md index f14f56b..dc422b1 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ the temperature value: ```scala IoTHub().source() - .map(m => parse(m.contentAsString).extract[Temperature]) + .map(m ⇒ parse(m.contentAsString).extract[Temperature]) .filter(_.value > 100) .to(console) .run() @@ -68,7 +68,7 @@ case class KafkaProducer(bootstrapServer: String)(implicit val system: ActorSyst val kafkaProducer = KafkaProducer(bootstrapServer) IoTHub().source() - .map(m => parse(m.contentAsString).extract[Temperature]) + .map(m ⇒ parse(m.contentAsString).extract[Temperature]) .filter(_.value > 100) .runWith(kafkaProducer.getSink()) ``` @@ -88,7 +88,7 @@ val p1 = 0 val p2 = 3 IoTHub().source(PartitionList(Seq(p1, p2))) - .map(m => parse(m.contentAsString).extract[Temperature]) + .map(m ⇒ parse(m.contentAsString).extract[Temperature]) .filter(_.value > 100) .to(console) .run() @@ -103,7 +103,7 @@ It's possible to start the stream from a given date and time too: val start = java.time.Instant.now() IoTHub().source(start) - .map(m => parse(m.contentAsString).extract[Temperature]) + .map(m ⇒ parse(m.contentAsString).extract[Temperature]) .filter(_.value > 100) .to(console) .run() @@ -180,7 +180,7 @@ val start = java.time.Instant.now() val withCheckpoints = false IoTHub().source(start, withCheckpoints) - .map(m => parse(m.contentAsString).extract[Temperature]) + .map(m ⇒ parse(m.contentAsString).extract[Temperature]) .filter(_.value > 100) .to(console) .run() @@ -193,7 +193,7 @@ your `build.sbt` file: ```scala libraryDependencies ++= { - val iothubReactV = "0.8.1" + val iothubReactV = "0.8.0" Seq( "com.microsoft.azure.iot" %% "iothub-react" % iothubReactV @@ -207,7 +207,7 @@ or this dependency in `pom.xml` file if working with Maven: com.microsoft.azure.iot iothub-react_2.12 - 0.8.1 + 0.8.0 ``` diff --git a/build.sbt b/build.sbt index c4fa7ad..363ec00 100644 --- a/build.sbt +++ b/build.sbt @@ -3,7 +3,7 @@ name := "iothub-react" organization := "com.microsoft.azure.iot" -version := "0.8.1" +version := "0.8.1" // Latest version released to Maven Central: 0.8.0 //version := "0.8.1-DEV.170309a" scalaVersion := "2.12.1" diff --git a/build.sh b/build.sh new file mode 100755 index 0000000..651187a --- /dev/null +++ b/build.sh @@ -0,0 +1,12 @@ +if [ "$1" == "PR" -o "$1" == "pr" ]; then + echo "Skipping tests requiring encrypted secrets." + export TRAVIS_PULL_REQUEST="true" +fi + +travis lint -x && \ +sbt +clean && \ +sbt +compile && \ +sbt +package && \ +sbt +test + +rm -f *.crt diff --git a/samples-java/build.sbt b/samples-java/build.sbt index 9c9ddf2..c0e60b6 100644 --- a/samples-java/build.sbt +++ b/samples-java/build.sbt @@ -1,3 +1,3 @@ // Copyright (c) Microsoft. All rights reserved. -scalaVersion := "2.12.0" +scalaVersion := "2.12.1" diff --git a/samples-java/pom.xml b/samples-java/pom.xml index 89baee7..678e2e5 100644 --- a/samples-java/pom.xml +++ b/samples-java/pom.xml @@ -6,7 +6,7 @@ com.microsoft.azure.iot iothub-react-demo - 0.8.1 + 0.8.0 diff --git a/samples-java/project/build.properties b/samples-java/project/build.properties index 13d3ee7..27e88aa 100644 --- a/samples-java/project/build.properties +++ b/samples-java/project/build.properties @@ -1 +1 @@ -sbt.version = 0.13.12 \ No newline at end of file +sbt.version=0.13.13 diff --git a/samples-java/src/main/java/SendMessageToDevice/ReactiveStreamingApp.java b/samples-java/src/main/java/SendMessageToDevice/ReactiveStreamingApp.java index aaa2f27..0adeccc 100644 --- a/samples-java/src/main/java/SendMessageToDevice/ReactiveStreamingApp.java +++ b/samples-java/src/main/java/SendMessageToDevice/ReactiveStreamingApp.java @@ -11,7 +11,6 @@ */ public class ReactiveStreamingApp { - private static ActorSystem system = ActorSystem.create("Demo"); protected final static Materializer streamMaterializer = ActorMaterializer.create(system); diff --git a/samples-scala/build.sbt b/samples-scala/build.sbt index 9c9ddf2..c0e60b6 100644 --- a/samples-scala/build.sbt +++ b/samples-scala/build.sbt @@ -1,3 +1,3 @@ // Copyright (c) Microsoft. All rights reserved. -scalaVersion := "2.12.0" +scalaVersion := "2.12.1" diff --git a/samples-scala/project/build.properties b/samples-scala/project/build.properties index 13d3ee7..27e88aa 100644 --- a/samples-scala/project/build.properties +++ b/samples-scala/project/build.properties @@ -1 +1 @@ -sbt.version = 0.13.12 \ No newline at end of file +sbt.version=0.13.13 diff --git a/samples-scala/src/main/scala/F_SendMessageToDevice/Demo.scala b/samples-scala/src/main/scala/F_SendMessageToDevice/Demo.scala index e512383..06be8d0 100644 --- a/samples-scala/src/main/scala/F_SendMessageToDevice/Demo.scala +++ b/samples-scala/src/main/scala/F_SendMessageToDevice/Demo.scala @@ -43,7 +43,7 @@ object Demo extends App with Deserialize { /* // Run the two workflows in parallel RunnableGraph.fromGraph(GraphDSL.create() { - implicit b => + implicit b ⇒ import GraphDSL.Implicits._ val shape = b.add(Broadcast[Temperature](2)) diff --git a/setup-env-vars.sh b/setup-env-vars.sh old mode 100644 new mode 100755 diff --git a/src/main/scala/com/microsoft/azure/iot/iothubreact/MessageFromDeviceSource.scala b/src/main/scala/com/microsoft/azure/iot/iothubreact/MessageFromDeviceSource.scala index 4ba6045..a6f8c1c 100644 --- a/src/main/scala/com/microsoft/azure/iot/iothubreact/MessageFromDeviceSource.scala +++ b/src/main/scala/com/microsoft/azure/iot/iothubreact/MessageFromDeviceSource.scala @@ -105,7 +105,7 @@ private class MessageFromDeviceSource() extends GraphStage[SourceShape[MessageFr // Define the (sole) output port of this stage private[this] val out: Outlet[MessageFromDevice] = Outlet("MessageFromDeviceSource") - // Define the shape of this stage => SourceShape with the port defined above + // Define the shape of this stage ⇒ SourceShape with the port defined above override val shape: SourceShape[MessageFromDevice] = SourceShape(out) // All state MUST be inside the GraphStageLogic, never inside the enclosing diff --git a/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/Backends/cassandra/lib/Auth.scala b/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/Backends/cassandra/lib/Auth.scala index fd198a5..7a34688 100644 --- a/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/Backends/cassandra/lib/Auth.scala +++ b/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/Backends/cassandra/lib/Auth.scala @@ -1,3 +1,5 @@ +// Copyright (c) Microsoft. All rights reserved. + package com.microsoft.azure.iot.iothubreact.checkpointing.backends.cassandra.lib -final case class Auth(username: String, password: String) \ No newline at end of file +case class Auth(username: String, password: String) diff --git a/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/Configuration.scala b/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/CPConfiguration.scala similarity index 78% rename from src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/Configuration.scala rename to src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/CPConfiguration.scala index c6e9f6e..d49c59d 100644 --- a/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/Configuration.scala +++ b/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/CPConfiguration.scala @@ -11,25 +11,67 @@ import scala.concurrent.duration._ import scala.language.postfixOps import scala.util.Try +/** Checkpointing configuration interface + */ trait ICPConfiguration { - val isEnabled : Boolean - val storageNamespace : String - val checkpointBackendType : String - val checkpointFrequency : FiniteDuration - val checkpointRWTimeout : FiniteDuration - val checkpointCountThreshold : Int - val checkpointTimeThreshold : FiniteDuration - val azureBlobEmulator : Boolean - val azureBlobConnectionString : String - val azureBlobLeaseDuration : FiniteDuration - val cassandraCluster : String + + /** Whether checkpointing is enabled + */ + val isEnabled: Boolean + + /** Namespace where the table with checkpoint data is stored (e.g. Cassandra keyspace) + */ + val storageNamespace: String + + /** Type of storage, the value is not case sensitive + */ + val checkpointBackendType: String + + /** How often checkpoint data is written to the storage + */ + val checkpointFrequency: FiniteDuration + + /** Checkpointing operations timeout + */ + val checkpointRWTimeout: FiniteDuration + + /** How many messages to replay after a restart, for each IoT hub partition + */ + val checkpointCountThreshold: Int + + /** Store a position if its value is older than this amount of time, rounded to seconds + */ + val checkpointTimeThreshold: FiniteDuration + + /** Whether to use the Azure Storage Emulator when using Azure blob backend + */ + val azureBlobEmulator: Boolean + + /** Azure blob connection string + */ + val azureBlobConnectionString: String + + /** Azure blob lease duration (between 15s and 60s by Azure docs) + */ + val azureBlobLeaseDuration: FiniteDuration + + /** Cassandra cluster address + * TODO: support list + */ + val cassandraCluster: String + + /** Cassandra replication factor, value required to open a connection + */ val cassandraReplicationFactor: Int - val cassandraAuth : Option[Auth] + + /** Cassandra authentication credentials + */ + val cassandraAuth: Option[Auth] } /** Hold IoT Hub stream checkpointing configuration settings */ -private[iothubreact] final class CPConfiguration(implicit val conf: Config = ConfigFactory.load) extends ICPConfiguration { +private[iothubreact] class CPConfiguration(implicit conf: Config = ConfigFactory.load) extends ICPConfiguration { // TODO: Allow to use multiple configurations, e.g. while processing multiple streams // a client will need a dedicated checkpoint container for each stream @@ -118,7 +160,7 @@ private[iothubreact] final class CPConfiguration(implicit val conf: Config = Con // Azure blob connection string lazy val azureBlobConnectionString: String = getAzureBlobConnectionString - // Azure blob lease duration (15s and 60s by Azure docs) + // Azure blob lease duration (between 15s and 60s by Azure docs) lazy val azureBlobLeaseDuration: FiniteDuration = getDuration( confPath + "storage.azureblob.lease", 15 seconds, @@ -178,4 +220,4 @@ private[iothubreact] final class CPConfiguration(implicit val conf: Config = Con else default } -} \ No newline at end of file +} diff --git a/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/CheckpointActorSystem.scala b/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/CheckpointActorSystem.scala index 3b1326b..9ad2fb3 100644 --- a/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/CheckpointActorSystem.scala +++ b/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/CheckpointActorSystem.scala @@ -26,8 +26,8 @@ private[iothubreact] object CheckpointActorSystem { val actorPath = "CheckpointService" + partition localRegistry get actorPath match { - case Some(actorRef) => actorRef - case None => { + case Some(actorRef) ⇒ actorRef + case None ⇒ { val actorRef = actorSystem.actorOf(Props(new CheckpointService(partition)), actorPath) localRegistry += Tuple2(actorPath, actorRef) actorRef diff --git a/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/CheckpointService.scala b/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/CheckpointService.scala index b77864a..8d70dd6 100644 --- a/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/CheckpointService.scala +++ b/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/CheckpointService.scala @@ -67,7 +67,7 @@ private[iothubreact] class CheckpointService(partition: Int)(implicit config: IC queuedOffsets = 0 } catch { - case e: Exception => { + case e: Exception ⇒ { log.error(e, e.getMessage) context.become(notReady) } @@ -121,7 +121,7 @@ private[iothubreact] class CheckpointService(partition: Int)(implicit config: IC log.debug(s"Partition=${partition}, checkpoint queue is empty [count ${queuedOffsets}, current offset=${currentOffset}]") } } catch { - case e: Exception => log.error(e, e.getMessage) + case e: Exception ⇒ log.error(e, e.getMessage) } finally { context.become(ready) } diff --git a/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/AzureBlob.scala b/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/AzureBlob.scala index 5716ad5..82a2958 100644 --- a/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/AzureBlob.scala +++ b/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/AzureBlob.scala @@ -17,7 +17,7 @@ import scala.language.{implicitConversions, postfixOps} /** Storage logic to write checkpoints to Azure blobs */ -private[iothubreact] class AzureBlob(implicit val config: ICPConfiguration) extends CheckpointBackend with Logger { +private[iothubreact] class AzureBlob(implicit config: ICPConfiguration) extends CheckpointBackend with Logger { // Set the account to point either to Azure or the emulator val account: CloudStorageAccount = if (config.azureBlobEmulator) diff --git a/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/CassandraTable.scala b/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/CassandraTable.scala index 6185d38..1d41c66 100644 --- a/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/CassandraTable.scala +++ b/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/CassandraTable.scala @@ -11,7 +11,7 @@ import org.json4s.JsonAST /** Storage logic to write checkpoints to a Cassandra table */ -private[iothubreact] class CassandraTable(implicit val config: ICPConfiguration) extends CheckpointBackend with Logger { +private[iothubreact] class CassandraTable(implicit config: ICPConfiguration) extends CheckpointBackend with Logger { val schema = new CheckpointsTableSchema(checkpointNamespace, "checkpoints") val connection = Connection(config.cassandraCluster, config.cassandraReplicationFactor, config.cassandraAuth, schema) diff --git a/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/cassandra/lib/Column.scala b/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/cassandra/lib/Column.scala index 0e3949f..849ccb1 100644 --- a/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/cassandra/lib/Column.scala +++ b/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/cassandra/lib/Column.scala @@ -17,10 +17,10 @@ private[iothubreact] object ColumnType extends Enumeration { * @return Type as string */ def toString(columnType: ColumnType): String = columnType match { - case ColumnType.String => "text" - case ColumnType.Timestamp => "timestamp" - case ColumnType.Double => "double" - case ColumnType.Int => "int" + case ColumnType.String ⇒ "text" + case ColumnType.Timestamp ⇒ "timestamp" + case ColumnType.Double ⇒ "double" + case ColumnType.Int ⇒ "int" case _ ⇒ throw new RuntimeException(s"Missing mapping for Cassandra type ${columnType}") } @@ -32,10 +32,10 @@ private[iothubreact] object ColumnType extends Enumeration { * @return Column type */ def fromName(typeAsString: String): ColumnType = typeAsString match { - case "text" => ColumnType.String - case "timestamp" => ColumnType.Timestamp - case "double" => ColumnType.Double - case "int" => ColumnType.Int + case "text" ⇒ ColumnType.String + case "timestamp" ⇒ ColumnType.Timestamp + case "double" ⇒ ColumnType.Double + case "int" ⇒ ColumnType.Int case _ ⇒ throw new IllegalArgumentException(s"Unknown Cassandra column type '${typeAsString}'") } diff --git a/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/cassandra/lib/Connection.scala b/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/cassandra/lib/Connection.scala index 2a247dc..66c7ef0 100644 --- a/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/cassandra/lib/Connection.scala +++ b/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/cassandra/lib/Connection.scala @@ -16,15 +16,15 @@ private[iothubreact] case class Connection( auth: Option[Auth], table: TableSchema) { - private lazy val hostPort = extractHostPort() + private lazy val hostPort = extractHostPort() private lazy val cluster = { val builder = Cluster.builder().addContactPoint(hostPort._1).withPort(hostPort._2) auth map { - creds ⇒ builder.withCredentials(creds.username, creds.password) - } getOrElse (builder) build() + creds ⇒ builder.withCredentials(creds.username, creds.password) + } getOrElse (builder) build() } - implicit lazy val session = cluster.connect() + implicit lazy val session = cluster.connect() /** Create the key space if not present */ @@ -71,7 +71,7 @@ private[iothubreact] case class Connection( * @param columns Columns */ private[this] def createT(tableName: String, columns: Seq[Column]): Unit = { - val columnsSql = columns.foldLeft("")((b, a) => s"$b\n${a.name} ${ColumnType.toString(a.`type`)},") + val columnsSql = columns.foldLeft("")((b, a) ⇒ s"$b\n${a.name} ${ColumnType.toString(a.`type`)},") val indexesSql = columns.filter(_.index).map(_.name).mkString("PRIMARY KEY(", ", ", ")") val createTable = s"CREATE TABLE IF NOT EXISTS ${table.keyspaceCQL}.$tableName($columnsSql $indexesSql)" session.execute(createTable) diff --git a/src/main/scala/com/microsoft/azure/iot/iothubreact/scaladsl/IoTHubPartition.scala b/src/main/scala/com/microsoft/azure/iot/iothubreact/scaladsl/IoTHubPartition.scala index ce0e3b4..1ac7423 100644 --- a/src/main/scala/com/microsoft/azure/iot/iothubreact/scaladsl/IoTHubPartition.scala +++ b/src/main/scala/com/microsoft/azure/iot/iothubreact/scaladsl/IoTHubPartition.scala @@ -63,33 +63,6 @@ private[iothubreact] case class IoTHubPartition(val partition: Int)(implicit cpc withCheckpoints = withCheckpoints && cpconfig.isEnabled) } - /** Stream returning all the messages from the given offset. Checkpoints are NOT saved but ARE loaded at startup. - * - * @param startTime Starting position expressed in time - * @return A source of IoT messages - */ - def sourceWithSavedCheckpoint(startTime: Instant): Source[MessageFromDevice, NotUsed] = { - getSource( - withTimeOffset = true, - startTime = startTime, - withCheckpoints = false, - startFromSavedCheckpoint = true) - } - - /** Stream returning all the messages from the given offset. Checkpoints are NOT saved but ARE loaded at startup. - * - * @param offset Starting position, offset of the first message - * @return A source of IoT messages - */ - def sourceWithSavedCheckpoint(offset: String): Source[MessageFromDevice, NotUsed] = { - getSource( - withTimeOffset = true, - offset = offset, - withCheckpoints = false, - startFromSavedCheckpoint = true) - } - - /** Create a stream returning all the messages for the defined partition, from the given start * point, optionally with checkpointing * @@ -148,7 +121,7 @@ private[iothubreact] case class IoTHubPartition(val partition: Int)(implicit cpc Await.result(future, rwTimeout.duration) } } catch { - case e: java.util.concurrent.TimeoutException => { + case e: java.util.concurrent.TimeoutException ⇒ { log.error(e, "Timeout while retrieving the offset from the storage") throw e } diff --git a/src/test/scala/api/API.scala b/src/test/scala/api/API.scala index b5ca37b..70c554f 100644 --- a/src/test/scala/api/API.scala +++ b/src/test/scala/api/API.scala @@ -1,23 +1,17 @@ // Copyright (c) Microsoft. All rights reserved. -// Namespace chosen to avoid access to internal classes +// NOTE: Namespace chosen to avoid access to internal classes package api -import java.util.UUID +// NOTE: No global imports to make easier detecting breaking changes -import com.microsoft.azure.iot.iothubreact.checkpointing.ICPConfiguration -import org.mockito.Mockito.when -import org.scalatest.mockito.MockitoSugar - -// No global imports to make easier detecting breaking changes - -class APIIsBackwardCompatible extends org.scalatest.FeatureSpec with MockitoSugar { +class APIIsBackwardCompatible extends org.scalatest.FeatureSpec with org.scalatest.mockito.MockitoSugar { info("As a developer using Azure IoT hub React") info("I want to be able to upgrade to new minor versions without changing my code") info("So I can benefit from improvements without excessive development costs") - implicit val cpconfig = mock[ICPConfiguration] + implicit val cpconfig = mock[com.microsoft.azure.iot.iothubreact.checkpointing.ICPConfiguration] feature("Version 0.x is backward compatible") { @@ -166,8 +160,8 @@ class APIIsBackwardCompatible extends org.scalatest.FeatureSpec with MockitoSuga val backend: CustomBackend = new CustomBackend() - val anyname = UUID.randomUUID.toString - when(cpconfig.storageNamespace).thenReturn(anyname) + val anyname = java.util.UUID.randomUUID.toString + org.mockito.Mockito.when(cpconfig.storageNamespace).thenReturn(anyname) assert(backend.checkpointNamespace == anyname) } diff --git a/src/test/scala/com/microsoft/azure/iot/iothubreact/checkpointing/ConfigurationTest.scala b/src/test/scala/com/microsoft/azure/iot/iothubreact/checkpointing/ConfigurationTest.scala index 091481b..4ebe382 100644 --- a/src/test/scala/com/microsoft/azure/iot/iothubreact/checkpointing/ConfigurationTest.scala +++ b/src/test/scala/com/microsoft/azure/iot/iothubreact/checkpointing/ConfigurationTest.scala @@ -1,3 +1,5 @@ +// Copyright (c) Microsoft. All rights reserved. + package com.microsoft.azure.iot.iothubreact.checkpointing import com.microsoft.azure.iot.iothubreact.checkpointing.backends.cassandra.lib.Auth