title | keywords | tags | permalink | |
---|---|---|---|---|
Client SDK Java |
|
client-sdk-java.html |
A client SDK for Java in order to interact with digital twins provided by an Eclipse Ditto backend.
- Digital twin management: CRUD (create, read, update, delete) of Ditto things
- Change notifications: consume notifications whenever a "watched" digital twin is modified
- Send/receive messages to/from devices connected via a digital twin
- Use the live channel in order to react on commands directed to devices targeting their "live" state
The Ditto Java client interacts with an Eclipse Ditto backend via Ditto's WebSocket sending and receiving messages in Ditto Protocol.
Maven coordinates:
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-client</artifactId>
<version>${ditto-client.version}</version>
</dependency>
To configure your Ditto client instance, use the org.eclipse.ditto.client.configuration
package in order to
- create instances of
AuthenticationProvider
andMessagingProvider
- create a
DisconnectedDittoClient
instance - obtain a
DittoClient
instance asynchronously by calling.connect()
For example:
ProxyConfiguration proxyConfiguration =
ProxyConfiguration.newBuilder()
.proxyHost("localhost")
.proxyPort(3128)
.build();
AuthenticationProvider authenticationProvider =
AuthenticationProviders.clientCredentials(ClientCredentialsAuthenticationConfiguration.newBuilder()
.clientId("my-oauth-client-id")
.clientSecret("my-oauth-client-secret")
.scopes("offline_access email")
.tokenEndpoint("https://my-oauth-provider/oauth/token")
// optionally configure a proxy server
.proxyConfiguration(proxyConfiguration)
.build());
MessagingProvider messagingProvider =
MessagingProviders.webSocket(WebSocketMessagingConfiguration.newBuilder()
.endpoint("wss://ditto.eclipseprojects.io")
// optionally configure a proxy server or a truststore containing the trusted CAs for SSL connection establishment
.proxyConfiguration(proxyConfiguration)
.trustStoreConfiguration(TrustStoreConfiguration.newBuilder()
.location(TRUSTSTORE_LOCATION)
.password(TRUSTSTORE_PASSWORD)
.build())
.build(), authenticationProvider);
DisconnectedDittoClient disconnectedDittoClient = DittoClients.newInstance(messagingProvider);
disconnectedDittoClient.connect()
.thenAccept(this::startUsingDittoClient)
.exceptionally(error -> disconnectedDittoClient.destroy());
client.twin().create("org.eclipse.ditto:new-thing").handle((createdThing, throwable) -> {
if (createdThing != null) {
System.out.println("Created new thing: " + createdThing);
} else {
System.out.println("Thing could not be created due to: " + throwable.getMessage());
}
return client.twin().forId(thingId).putAttribute("first-updated-at", OffsetDateTime.now().toString());
}).toCompletableFuture().get(); // this will block the thread! work asynchronously whenever possible!
In order to subscribe for events emitted by Ditto after a twin was modified, start the
consumption on the twin
channel:
client.twin().startConsumption().toCompletableFuture().get(); // this will block the thread! work asynchronously whenever possible!
System.out.println("Subscribed for Twin events");
client.twin().registerForThingChanges("my-changes", change -> {
if (change.getAction() == ChangeAction.CREATED) {
System.out.println("An existing Thing was modified: " + change.getThing());
// perform custom actions ..
}
});
There is also the possibility here to apply server side filtering of which events will get delivered to the client:
client.twin().startConsumption(
Options.Consumption.filter("gt(features/temperature/properties/value,23.0)")
).toCompletableFuture().get(); // this will block the thread! work asynchronously whenever possible!
System.out.println("Subscribed for Twin events");
client.twin().registerForFeaturePropertyChanges("my-feature-changes", "temperature", "value", change -> {
// perform custom actions ..
});
In order to use enrichment in the Ditto Java client, the startConsumption()
call can be
enhanced with the additional extra fields:
client.twin().startConsumption(
Options.Consumption.extraFields(JsonFieldSelector.newInstance("attributes/location"))
).toCompletableFuture().get(); // this will block the thread! work asynchronously whenever possible!
client.twin().registerForThingChanges("my-enriched-changes", change -> {
Optional<JsonObject> extra = change.getExtra();
// perform custom actions, making use of the 'extra' data ..
});
In combination with a filter
, the extra fields may also be used as part of such a filter:
client.twin().startConsumption(
Options.Consumption.extraFields(JsonFieldSelector.newInstance("attributes/location")),
Options.Consumption.filter("eq(attributes/location,\"kitchen\")")
).toCompletableFuture().get(); // this will block the thread! work asynchronously whenever possible!
// register the callbacks...
Register for receiving messages with the subject hello.world
on any thing:
client.live().startConsumption().toCompletableFuture().get(); // this will block the thread! work asynchronously whenever possible!
System.out.println("Subscribed for live messages/commands/events");
client.live().registerForMessage("globalMessageHandler", "hello.world", message -> {
System.out.println("Received Message with subject " + message.getSubject());
message.reply()
.statusCode(HttpStatusCode.IM_A_TEAPOT)
.payload("Hello, I'm just a Teapot!")
.send();
});
Send a message with the subject hello.world
to the thing with ID org.eclipse.ditto:new-thing
:
client.live().forId("org.eclipse.ditto:new-thing")
.message()
.from()
.subject("hello.world")
.payload("I am a Teapot")
.send(String.class, (response, throwable) ->
System.out.println("Got response: " + response.getPayload().orElse(null))
);
Read a policy:
Policy retrievedPolicy = client.policies().retrieve(PolicyId.of("org.eclipse.ditto:new-policy"))
.toCompletableFuture().get(); // this will block the thread! work asynchronously whenever possible!
Create a policy:
Policy newPolicy = Policy.newBuilder(PolicyId.of("org.eclipse.ditto:new-policy"))
.forLabel("DEFAULT")
.setSubject(Subject.newInstance(SubjectIssuer.newInstance("nginx"), "ditto"))
.setGrantedPermissions(PoliciesResourceType.policyResource("/"), "READ", "WRITE")
.setGrantedPermissions(PoliciesResourceType.thingResource("/"), "READ", "WRITE")
.build();
client.policies().create(newPolicy)
.toCompletableFuture().get(); // this will block the thread! work asynchronously whenever possible!
Updating and deleting policies is also possible via the Java client API, please follow the API and the JavaDoc.
Search for things using the Java 8 java.util.Stream
API:
client.twin().search()
.stream(queryBuilder -> queryBuilder.namespace("org.eclipse.ditto")
.filter("eq(attributes/location,'kitchen')") // apply RQL expression here
.options(builder -> builder.sort(s -> s.desc("thingId")).size(1))
)
.forEach(foundThing -> System.out.println("Found thing: " + foundThing));
Use an RQL query in order to filter for the searched things.
Search for things using the reactive streams org.reactivestreams.Publisher
API:
Publisher<List<Thing>> publisher = client.twin().search()
.publisher(queryBuilder -> queryBuilder.namespace("org.eclipse.ditto")
.filter("eq(attributes/location,'kitchen')") // apply RQL expression here
.options(builder -> builder.sort(s -> s.desc("thingId")).size(1))
);
// integrate the publisher in the reactive streams library of your choice, e.g. Pekko streams:
org.apache.pekko.stream.javadsl.Source<Thing, NotUsed> things = org.apache.pekko.stream.javadsl.Source.fromPublisher(publisher)
.flatMapConcat(Source::from);
// .. proceed working with the Pekko Source ..
Requesting acknowledgements is possible in the Ditto Java client in the following way:
DittoHeaders dittoHeaders = DittoHeaders.newBuilder()
.acknowledgementRequest(
AcknowledgementRequest.of(DittoAcknowledgementLabel.PERSISTED),
AcknowledgementRequest.of(AcknowledgementLabel.of("my-custom-ack"))
)
.timeout("5s")
.build();
client.twin().forId(ThingId.of("org.eclipse.ditto:my-thing"))
.putAttribute("counter", 42, Options.dittoHeaders(dittoHeaders))
.whenComplete((aVoid, throwable) -> {
if (throwable instanceof AcknowledgementsFailedException) {
Acknowledgements acknowledgements = ((AcknowledgementsFailedException) throwable).getAcknowledgements();
System.out.println("Acknowledgements could not be fulfilled: " + acknowledgements);
}
});
Issuing requested acknowledgements can be done like this
whenever a Change
callback is invoked with a change notification:
client.twin().registerForThingChanges("REG1", change -> {
change.handleAcknowledgementRequest(AcknowledgementLabel.of("my-custom-ack"), ackHandle ->
ackHandle.acknowledge(HttpStatusCode.NOT_FOUND, JsonObject.newBuilder()
.set("error-detail", "Could not be found")
.build()
)
);
});
For further examples on how to use the Ditto client, please have a look at the class DittoClientUsageExamples which is configured to connect to the Ditto sandbox.