Asynchronous library for accessing InfluxDB from Scala.
Add the following to your build.sbt
libraryDependencies += "com.paulgoldbaum" %% "scala-influxdb-client" % "0.5.2"NOTE: Starting with version 0.5.0 JDK 8 is required.
import com.paulgoldbaum.influxdbclient._
import scala.concurrent.ExecutionContext.Implicits.global
val influxdb = InfluxDB.connect("localhost", 8086)And when all done close the client:
influxdb.close()All methods are non-blocking and return a Future; in most cases a Future[QueryResponse] which might be empty if
the action does not return a result. Failing Futures carry a subclass of InfluxDBException
val database = influxdb.selectDatabase("my_database")
database.exists() // => Future[Boolean]
database.create()
database.drop()val point = Point("cpu")
.addTag("host", "my.server")
.addField("1m", 0.3)
.addField("5m", 0.4)
.addField("15m", 0.5)
database.write(point)Additionally, timestamp precision, consistency and custom retention policies can be specified
val point = Point("cpu", System.currentTimeMillis())
database.write(point,
precision = Precision.MILLISECONDS,
consistency = Consistency.ALL,
retentionPolicy = "custom_rp")If no precision parameter is given, InfluxDB assumes timestamps to be in nanoseconds.
If a write fails, it's future will contain a subclass of WriteException. This can be handled through the usual
methods of error handling in Futures, i.e.
database.write(point)
// ...
.recover{ case e: WriteException => ...}Multiple points can be written in one operation by using the bulkWrite operation
val points = List(
Point("test_measurement1").addField("value1", 123),
Point("test_measurement2").addField("value2", 123),
Point("test_measurement3").addField("value3", 123)
)
database.bulkWrite(points, precision = Precision.MILLISECONDS)Given the following data:
name: cpu
---------
time host region value
2015-10-14T18:31:14.744203449Z serverA us_west 0.64
2015-10-14T18:31:19.242472211Z serverA us_west 0.85
2015-10-14T18:31:22.644254309Z serverA us_west 0.43
database.query("SELECT * FROM cpu")This returns a Future[QueryResult]. To access the list of records use
result.series.head.recordswhich we can iterate to access the different fields
result.series.head.records.foreach(record => record("host"))For each record, we can access all it's values at once using the allValues property
result.series.head.records(0).allValuesIf we are only interested in the "value" field of each record
result.series.head.points("value")returns a list of just the value field of each record.
The list of column names can be accessed through
result.series.head.columnsMultiple queries can be sent to the server at the same time using the multiQuery method
database.multiQuery(List("SELECT * FROM cpu LIMIT 5", "SELECT * FROM cpu LIMIT 5 OFFSET 5"))In this case, the result is a Future[List[QueryResult]].
Errors during queries return a QueryException.
influxdb.createUser(username, password, isClusterAdmin)
influxdb.dropUser(username)
influxdb.showUsers()
influxdb.setUserPassword(username, password)
influxdb.grantPrivileges(username, database, privilege)
influxdb.revokePrivileges(username, database, privilege)
influxdb.makeClusterAdmin(username)
influxdb.userIsClusterAdmin(username)database.createRetentionPolicy(name, duration, replication, default)
database.showRetentionPolicies()
database.dropRetentionPolicy(name)
database.alterRetentionPolicy(name, duration, replication, default)NOTE: User and retention policy management primitives return an empty QueryResult or fail with a QueryException in case of an error.
import com.paulgoldbaum.influxdbclient._
val udpClient = InfluxDB.udpConnect("localhost", 8086)
val point = Point("cpu", System.currentTimeMillis())
udpClient.write(point)Points can also be written in bulk
val points = List(
Point("test_measurement1").addField("value1", 123),
Point("test_measurement2").addField("value2", 123),
Point("test_measurement3").addField("value3", 123)
)
udpClient.bulkWrite(points)