Skip to content
This repository has been archived by the owner on Oct 11, 2023. It is now read-only.

Commit

Permalink
Backports from 0.9 branch (scala 2.12.1 and code style)
Browse files Browse the repository at this point in the history
* Upgrade Scala from 2.12.0 to 2.12.1
* Remove dead code in IoTHubPartition
* Code style
  • Loading branch information
dluc committed Mar 15, 2017
1 parent fc2e73a commit 6508964
Show file tree
Hide file tree
Showing 26 changed files with 123 additions and 96 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -3,6 +3,7 @@
tools/devices-simulator/credentials.js
*.crt
.ensime
_dev

### MacOS ###
.DS_Store
Expand Down
10 changes: 6 additions & 4 deletions .travis.yml
Expand Up @@ -2,14 +2,16 @@ jdk: oraclejdk8
language: scala
scala:
- 2.11.8
- 2.12.0
- 2.12.1
cache:
directories:
- "$HOME/.ivy2"
- "$HOME/.sbt"
- "$HOME/.m2"
notifications:
slack:
secure: S6pcmclrj9vaqHOFMrjgYkF6wXrYF6nB5joYY0rqAwsmTLf7crXRVKZ8txlatpxMHc20Rbw8RQDM6tTka9wwBkHZZfErrcPsS84d5MU9siEkIY42/bAQwuYhxkcgilttgFmSwzLodE72giC/VMhIYCSOyOXIxuR0VtBiPD9Inm9QZ35dZDx3P3nbnaOC4fk+BjdbrX1LB8YL9z5Gy/9TqI90w0FV85XMef75EnSgpqeMD/GMB5hIg+arWVnC2S6hZ91PPCcxCTKBYDjwqUac8mFW/sMFT/yrb2c0NE6ZQqa3dlx/XFyC1X6+7DjJli2Y8OU+FPjY1tQC8JxgVFTbddIgCdUM/5be4uHN/KNs/yF7w1g06ZXK4jhJxxpL4zWINlqDrDmLaqhAtPQkc2CqL3g8MCwYxBbxZY4aFyPfZD7YLdQXDzJZNcfXn9RQQh5y+/zexbGc1zZ/XUo5bK3VbElSs+o2ErI+Sze0FaiK8fW+QeitBdGvjMY7YVKi0Zzf5Dxx1wwxiHR1PQ1r0hA8YZQxwwdpa5lWLFlSVu2w+upPtXqfINMeFktQPbOs1JWIvUvLV0A38dS6R/DsM/W1a3OEVbHQ0Z6OV1nffDnGYPLUl5kRDPFuYYugmCpQHW73lqJdiM0O+Ote4eOQniL1rcajtt+V5cn1/JRWzdJ4PH0=
before_install:
- if [ "$TRAVIS_PULL_REQUEST" = "false" ]; then openssl aes-256-cbc -K $encrypted_cbef0ff679f7_key -iv $encrypted_cbef0ff679f7_iv -in devices.json.enc -out src/test/resources/devices.json -d ; fi
notifications:
slack:
rooms:
- secure: VW5Pw7NsQw8Uv9Zncn7TEy6vEoudygJDTdyMBjcfpsbibHVjOeDMMw/PKGrr2Y/W6b26LoIaz7uENynwZ9IXML+edIsJB4Kek7cS8YUQn4sT2UAobo8gSgia5aOVd39O48fUdikzJyscVPhyAN0eLiyUkFdDSfuZTonk1qDJEAeW6lbkepPyczoPtXio+K6jPoU+43eW+qA5ZYup+e5Hgo0TSOCaaxF0gDgD+duBVcYEXKnPnCxGhVi6DE8rzBBmcZenUHWFXoZLWVDILMcbjTDcTbx2UvQQ6EMHhyTMkK2v8D0qw3h5H9RtvGCx6OUbOi6ko8giYuC/tADm5TSksTHJOzTyGVcnw214smNBa4kPEwA0f0ayESRXsxwylI7P39IKMyDgGeDkfZdoPr2uaMl0rB98ubUlbllm7o+QtjUegsG2Zf91wL1wxTTXyDldgtot+5IEHyZcFke8zyJooj1Pjk4vasEBDPtDqiqiLOBc8a8tF6TccNb/dfCK3xh2sLVlVU95hrc6tw/ZJk9gZ0U9dDwPYixPTO8X8530UAo4E9s3epMzkdMahcAWFPGpd8tGO3IqegZps4d2p75aoRodz8JO54HRLHX3Gz+EsNDfgHeVqFcA6+rW+P04R2p8fJD1sWpDbq1xT4nQQ6L4TAt8UObkVET22T0BNaOdwqo=
- secure: evAZ40O+fXFmKAmfqnaLjLhELHTXL/aKOlMfDlStfVM35V+JMCivvQt/V+8ObmmkHBI6y9XoeILZzDD1cfCPjUzPAmHpnL6cT9kavu+OhS8BnyGvOcp1BysHIH1DHJ/JpYAKSXkMR2PoGakUp9kvETO+5s5ZKp909vtZqG4oShyqNz8jO6BYKwJrcU/ehOIRG2uHza2KtpRREFlwXu+Ca20+2oV74X0GvaqyDgVjEwxwLRc4KUg+/NPy4YWusBg/6Q/TV1HIAktlROA7ZOb9+DUshWZkOvd+F6JVaCzQDlSKCDvZpuddwUi+YveYlKqb8s7R+zzhxc6KPsXMrJ79k8E1uiZiEjRV3HTLxgZnKBYoKuglgKS1R1nQSsiJfVFQJn+dbBO0HxT9f3hy8nI3EpfTpy9DAO+wOnHycRBp1TCIPBEOCwIfBo2UvxqVAsBfIC3WmOF4VhlZxr/U+QCwolvWNjkPRrcKh4CzQEj2VIoAfyJ0vKujytB2gnv6Sp0mbbcK1jEJdNShc/wPdIXwn7oTT/2RnCdiKPivsF66wKcDmL7U/+zvVqRsw2wZrsw97n+j9lvj4YYZUVVXxtEQuXZ4sru5FM4a08lnS+cY5BfomvMkAT7UXKeV++sF7gBUq7DGQ4Xs6aLMZAAFSbPSw5mT349e+ipQyG8jX/iBllk=
2 changes: 1 addition & 1 deletion CHECKPOINTING.md
Expand Up @@ -49,7 +49,7 @@ val start = java.time.Instant.now()
val withCheckpoints = false

IoTHub().source(start, withCheckpoints)
.map(m => jsonParser.readValue(m.contentAsString, classOf[Temperature]))
.map(m jsonParser.readValue(m.contentAsString, classOf[Temperature]))
.filter(_.value > 100)
.to(console)
.run()
Expand Down
14 changes: 7 additions & 7 deletions README.md
Expand Up @@ -20,7 +20,7 @@ the temperature value:

```scala
IoTHub().source()
.map(m => parse(m.contentAsString).extract[Temperature])
.map(m parse(m.contentAsString).extract[Temperature])
.filter(_.value > 100)
.to(console)
.run()
Expand Down Expand Up @@ -68,7 +68,7 @@ case class KafkaProducer(bootstrapServer: String)(implicit val system: ActorSyst
val kafkaProducer = KafkaProducer(bootstrapServer)

IoTHub().source()
.map(m => parse(m.contentAsString).extract[Temperature])
.map(m parse(m.contentAsString).extract[Temperature])
.filter(_.value > 100)
.runWith(kafkaProducer.getSink())
```
Expand All @@ -88,7 +88,7 @@ val p1 = 0
val p2 = 3

IoTHub().source(PartitionList(Seq(p1, p2)))
.map(m => parse(m.contentAsString).extract[Temperature])
.map(m parse(m.contentAsString).extract[Temperature])
.filter(_.value > 100)
.to(console)
.run()
Expand All @@ -103,7 +103,7 @@ It's possible to start the stream from a given date and time too:
val start = java.time.Instant.now()

IoTHub().source(start)
.map(m => parse(m.contentAsString).extract[Temperature])
.map(m parse(m.contentAsString).extract[Temperature])
.filter(_.value > 100)
.to(console)
.run()
Expand Down Expand Up @@ -180,7 +180,7 @@ val start = java.time.Instant.now()
val withCheckpoints = false

IoTHub().source(start, withCheckpoints)
.map(m => parse(m.contentAsString).extract[Temperature])
.map(m parse(m.contentAsString).extract[Temperature])
.filter(_.value > 100)
.to(console)
.run()
Expand All @@ -193,7 +193,7 @@ your `build.sbt` file:

```scala
libraryDependencies ++= {
val iothubReactV = "0.8.1"
val iothubReactV = "0.8.0"

Seq(
"com.microsoft.azure.iot" %% "iothub-react" % iothubReactV
Expand All @@ -207,7 +207,7 @@ or this dependency in `pom.xml` file if working with Maven:
<dependency>
<groupId>com.microsoft.azure.iot</groupId>
<artifactId>iothub-react_2.12</artifactId>
<version>0.8.1</version>
<version>0.8.0</version>
</dependency>
```

Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Expand Up @@ -3,7 +3,7 @@
name := "iothub-react"
organization := "com.microsoft.azure.iot"

version := "0.8.1"
version := "0.8.1" // Latest version released to Maven Central: 0.8.0
//version := "0.8.1-DEV.170309a"

scalaVersion := "2.12.1"
Expand Down
12 changes: 12 additions & 0 deletions build.sh
@@ -0,0 +1,12 @@
if [ "$1" == "PR" -o "$1" == "pr" ]; then
echo "Skipping tests requiring encrypted secrets."
export TRAVIS_PULL_REQUEST="true"
fi

travis lint -x && \
sbt +clean && \
sbt +compile && \
sbt +package && \
sbt +test

rm -f *.crt
2 changes: 1 addition & 1 deletion samples-java/build.sbt
@@ -1,3 +1,3 @@
// Copyright (c) Microsoft. All rights reserved.

scalaVersion := "2.12.0"
scalaVersion := "2.12.1"
2 changes: 1 addition & 1 deletion samples-java/pom.xml
Expand Up @@ -6,7 +6,7 @@

<groupId>com.microsoft.azure.iot</groupId>
<artifactId>iothub-react-demo</artifactId>
<version>0.8.1</version>
<version>0.8.0</version>

<repositories>
<repository>
Expand Down
2 changes: 1 addition & 1 deletion samples-java/project/build.properties
@@ -1 +1 @@
sbt.version = 0.13.12
sbt.version=0.13.13
Expand Up @@ -11,7 +11,6 @@
*/
public class ReactiveStreamingApp
{

private static ActorSystem system = ActorSystem.create("Demo");

protected final static Materializer streamMaterializer = ActorMaterializer.create(system);
Expand Down
2 changes: 1 addition & 1 deletion samples-scala/build.sbt
@@ -1,3 +1,3 @@
// Copyright (c) Microsoft. All rights reserved.

scalaVersion := "2.12.0"
scalaVersion := "2.12.1"
2 changes: 1 addition & 1 deletion samples-scala/project/build.properties
@@ -1 +1 @@
sbt.version = 0.13.12
sbt.version=0.13.13
Expand Up @@ -43,7 +43,7 @@ object Demo extends App with Deserialize {
/*
// Run the two workflows in parallel
RunnableGraph.fromGraph(GraphDSL.create() {
implicit b =>
implicit b
import GraphDSL.Implicits._
val shape = b.add(Broadcast[Temperature](2))
Expand Down
Empty file modified setup-env-vars.sh 100644 → 100755
Empty file.
Expand Up @@ -105,7 +105,7 @@ private class MessageFromDeviceSource() extends GraphStage[SourceShape[MessageFr
// Define the (sole) output port of this stage
private[this] val out: Outlet[MessageFromDevice] = Outlet("MessageFromDeviceSource")

// Define the shape of this stage => SourceShape with the port defined above
// Define the shape of this stage SourceShape with the port defined above
override val shape: SourceShape[MessageFromDevice] = SourceShape(out)

// All state MUST be inside the GraphStageLogic, never inside the enclosing
Expand Down
@@ -1,3 +1,5 @@
// Copyright (c) Microsoft. All rights reserved.

package com.microsoft.azure.iot.iothubreact.checkpointing.backends.cassandra.lib

final case class Auth(username: String, password: String)
case class Auth(username: String, password: String)
Expand Up @@ -11,25 +11,67 @@ import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.Try

/** Checkpointing configuration interface
*/
trait ICPConfiguration {
val isEnabled : Boolean
val storageNamespace : String
val checkpointBackendType : String
val checkpointFrequency : FiniteDuration
val checkpointRWTimeout : FiniteDuration
val checkpointCountThreshold : Int
val checkpointTimeThreshold : FiniteDuration
val azureBlobEmulator : Boolean
val azureBlobConnectionString : String
val azureBlobLeaseDuration : FiniteDuration
val cassandraCluster : String

/** Whether checkpointing is enabled
*/
val isEnabled: Boolean

/** Namespace where the table with checkpoint data is stored (e.g. Cassandra keyspace)
*/
val storageNamespace: String

/** Type of storage, the value is not case sensitive
*/
val checkpointBackendType: String

/** How often checkpoint data is written to the storage
*/
val checkpointFrequency: FiniteDuration

/** Checkpointing operations timeout
*/
val checkpointRWTimeout: FiniteDuration

/** How many messages to replay after a restart, for each IoT hub partition
*/
val checkpointCountThreshold: Int

/** Store a position if its value is older than this amount of time, rounded to seconds
*/
val checkpointTimeThreshold: FiniteDuration

/** Whether to use the Azure Storage Emulator when using Azure blob backend
*/
val azureBlobEmulator: Boolean

/** Azure blob connection string
*/
val azureBlobConnectionString: String

/** Azure blob lease duration (between 15s and 60s by Azure docs)
*/
val azureBlobLeaseDuration: FiniteDuration

/** Cassandra cluster address
* TODO: support list
*/
val cassandraCluster: String

/** Cassandra replication factor, value required to open a connection
*/
val cassandraReplicationFactor: Int
val cassandraAuth : Option[Auth]

/** Cassandra authentication credentials
*/
val cassandraAuth: Option[Auth]
}

/** Hold IoT Hub stream checkpointing configuration settings
*/
private[iothubreact] final class CPConfiguration(implicit val conf: Config = ConfigFactory.load) extends ICPConfiguration {
private[iothubreact] class CPConfiguration(implicit conf: Config = ConfigFactory.load) extends ICPConfiguration {

// TODO: Allow to use multiple configurations, e.g. while processing multiple streams
// a client will need a dedicated checkpoint container for each stream
Expand Down Expand Up @@ -118,7 +160,7 @@ private[iothubreact] final class CPConfiguration(implicit val conf: Config = Con
// Azure blob connection string
lazy val azureBlobConnectionString: String = getAzureBlobConnectionString

// Azure blob lease duration (15s and 60s by Azure docs)
// Azure blob lease duration (between 15s and 60s by Azure docs)
lazy val azureBlobLeaseDuration: FiniteDuration = getDuration(
confPath + "storage.azureblob.lease",
15 seconds,
Expand Down Expand Up @@ -178,4 +220,4 @@ private[iothubreact] final class CPConfiguration(implicit val conf: Config = Con
else
default
}
}
}
Expand Up @@ -26,8 +26,8 @@ private[iothubreact] object CheckpointActorSystem {
val actorPath = "CheckpointService" + partition

localRegistry get actorPath match {
case Some(actorRef) => actorRef
case None => {
case Some(actorRef) actorRef
case None {
val actorRef = actorSystem.actorOf(Props(new CheckpointService(partition)), actorPath)
localRegistry += Tuple2(actorPath, actorRef)
actorRef
Expand Down
Expand Up @@ -67,7 +67,7 @@ private[iothubreact] class CheckpointService(partition: Int)(implicit config: IC
queuedOffsets = 0
}
catch {
case e: Exception => {
case e: Exception {
log.error(e, e.getMessage)
context.become(notReady)
}
Expand Down Expand Up @@ -121,7 +121,7 @@ private[iothubreact] class CheckpointService(partition: Int)(implicit config: IC
log.debug(s"Partition=${partition}, checkpoint queue is empty [count ${queuedOffsets}, current offset=${currentOffset}]")
}
} catch {
case e: Exception => log.error(e, e.getMessage)
case e: Exception log.error(e, e.getMessage)
} finally {
context.become(ready)
}
Expand Down
Expand Up @@ -17,7 +17,7 @@ import scala.language.{implicitConversions, postfixOps}

/** Storage logic to write checkpoints to Azure blobs
*/
private[iothubreact] class AzureBlob(implicit val config: ICPConfiguration) extends CheckpointBackend with Logger {
private[iothubreact] class AzureBlob(implicit config: ICPConfiguration) extends CheckpointBackend with Logger {

// Set the account to point either to Azure or the emulator
val account: CloudStorageAccount = if (config.azureBlobEmulator)
Expand Down
Expand Up @@ -11,7 +11,7 @@ import org.json4s.JsonAST

/** Storage logic to write checkpoints to a Cassandra table
*/
private[iothubreact] class CassandraTable(implicit val config: ICPConfiguration) extends CheckpointBackend with Logger {
private[iothubreact] class CassandraTable(implicit config: ICPConfiguration) extends CheckpointBackend with Logger {

val schema = new CheckpointsTableSchema(checkpointNamespace, "checkpoints")
val connection = Connection(config.cassandraCluster, config.cassandraReplicationFactor, config.cassandraAuth, schema)
Expand Down
Expand Up @@ -17,10 +17,10 @@ private[iothubreact] object ColumnType extends Enumeration {
* @return Type as string
*/
def toString(columnType: ColumnType): String = columnType match {
case ColumnType.String => "text"
case ColumnType.Timestamp => "timestamp"
case ColumnType.Double => "double"
case ColumnType.Int => "int"
case ColumnType.String "text"
case ColumnType.Timestamp "timestamp"
case ColumnType.Double "double"
case ColumnType.Int "int"

case _ throw new RuntimeException(s"Missing mapping for Cassandra type ${columnType}")
}
Expand All @@ -32,10 +32,10 @@ private[iothubreact] object ColumnType extends Enumeration {
* @return Column type
*/
def fromName(typeAsString: String): ColumnType = typeAsString match {
case "text" => ColumnType.String
case "timestamp" => ColumnType.Timestamp
case "double" => ColumnType.Double
case "int" => ColumnType.Int
case "text" ColumnType.String
case "timestamp" ColumnType.Timestamp
case "double" ColumnType.Double
case "int" ColumnType.Int

case _ throw new IllegalArgumentException(s"Unknown Cassandra column type '${typeAsString}'")
}
Expand Down
Expand Up @@ -16,15 +16,15 @@ private[iothubreact] case class Connection(
auth: Option[Auth],
table: TableSchema) {

private lazy val hostPort = extractHostPort()
private lazy val hostPort = extractHostPort()
private lazy val cluster = {
val builder = Cluster.builder().addContactPoint(hostPort._1).withPort(hostPort._2)
auth map {
creds builder.withCredentials(creds.username, creds.password)
} getOrElse (builder) build()
creds builder.withCredentials(creds.username, creds.password)
} getOrElse (builder) build()
}

implicit lazy val session = cluster.connect()
implicit lazy val session = cluster.connect()

/** Create the key space if not present
*/
Expand Down Expand Up @@ -71,7 +71,7 @@ private[iothubreact] case class Connection(
* @param columns Columns
*/
private[this] def createT(tableName: String, columns: Seq[Column]): Unit = {
val columnsSql = columns.foldLeft("")((b, a) => s"$b\n${a.name} ${ColumnType.toString(a.`type`)},")
val columnsSql = columns.foldLeft("")((b, a) s"$b\n${a.name} ${ColumnType.toString(a.`type`)},")
val indexesSql = columns.filter(_.index).map(_.name).mkString("PRIMARY KEY(", ", ", ")")
val createTable = s"CREATE TABLE IF NOT EXISTS ${table.keyspaceCQL}.$tableName($columnsSql $indexesSql)"
session.execute(createTable)
Expand Down

0 comments on commit 6508964

Please sign in to comment.