Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API Refactoring: add execute operation to requests, extract SPI package #27

Merged
merged 18 commits into from Oct 18, 2018
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ Licensed to the Apache Software Foundation (ASF) under one
import com.microsoft.azure.sdk.iot.device.IotHubClientProtocol;
import com.microsoft.azure.sdk.iot.device.Message;
import org.apache.plc4x.java.PlcDriverManager;
import org.apache.plc4x.java.api.connection.PlcConnection;
import org.apache.plc4x.java.api.connection.PlcReader;
import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.messages.PlcReadRequest;
import org.apache.plc4x.java.api.messages.PlcReadResponse;
import org.slf4j.Logger;
Expand Down Expand Up @@ -62,15 +61,13 @@ public static void main(String[] args) throws Exception {
DeviceClient client = new DeviceClient(iotConnectionString, IotHubClientProtocol.MQTT);
client.open();

// Get a reader instance.
PlcReader plcReader = plcConnection.getReader().orElseThrow(IllegalStateException::new);

// Prepare a read request.
PlcReadRequest request = plcReader.readRequestBuilder().addItem(FIELD_NAME, addressString).build();
PlcReadRequest request = plcConnection.readRequestBuilder().get().addItem(FIELD_NAME, addressString).build();

while (!Thread.currentThread().isInterrupted()) {
// Simulate telemetry.
PlcReadResponse<?> response = plcReader.read(request).get();
PlcReadResponse response = request.execute().get();
response.getAllLongs(FIELD_NAME)
.forEach(longValue -> {
String result = Long.toBinaryString(longValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ Licensed to the Apache Software Foundation (ASF) under one
*/
package org.apache.plc4x.java.examples.dummydriver;

import org.apache.plc4x.java.api.PlcDriver;
import org.apache.plc4x.java.spi.PlcDriver;
import org.apache.plc4x.java.api.authentication.PlcAuthentication;
import org.apache.plc4x.java.api.connection.PlcConnection;
import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
import org.apache.plc4x.java.examples.dummydriver.connection.DummyConnection;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,17 @@ Licensed to the Apache Software Foundation (ASF) under one
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import org.apache.plc4x.java.api.connection.PlcReader;
import org.apache.plc4x.java.api.connection.PlcWriter;
import org.apache.plc4x.java.api.messages.PlcReadRequest;
import org.apache.plc4x.java.api.messages.PlcReadResponse;
import org.apache.plc4x.java.api.messages.PlcWriteRequest;
import org.apache.plc4x.java.api.messages.PlcWriteResponse;
import org.apache.plc4x.java.base.messages.PlcReader;
import org.apache.plc4x.java.base.messages.PlcWriter;
import org.apache.plc4x.java.api.messages.*;
import org.apache.plc4x.java.base.connection.AbstractPlcConnection;
import org.apache.plc4x.java.base.connection.TcpSocketChannelFactory;
import org.apache.plc4x.java.base.messages.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetAddress;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

public class DummyConnection extends AbstractPlcConnection implements PlcReader, PlcWriter {
Expand All @@ -58,13 +56,29 @@ protected void initChannel(Channel channel) {
}

@Override
public PlcReadRequest.Builder readRequestBuilder() {
public Optional<PlcReadRequest.Builder> readRequestBuilder() {
// TODO: Implement this ...
return null;
return Optional.empty();
}

@Override
public CompletableFuture<PlcReadResponse<?>> read(PlcReadRequest readRequest) {
public Optional<PlcWriteRequest.Builder> writeRequestBuilder() {
// TODO: Implement this ...
return Optional.empty();
}

@Override
public Optional<PlcSubscriptionRequest.Builder> subscriptionRequestBuilder() {
return Optional.empty();
}

@Override
public Optional<PlcUnsubscriptionRequest.Builder> unsubscriptionRequestBuilder() {
return Optional.empty();
}

@Override
public CompletableFuture<PlcReadResponse> read(PlcReadRequest readRequest) {
CompletableFuture<InternalPlcReadResponse> readFuture = new CompletableFuture<>();
PlcRequestContainer<InternalPlcReadRequest, InternalPlcReadResponse> container =
new PlcRequestContainer<>((InternalPlcReadRequest) readRequest, readFuture);
Expand All @@ -74,13 +88,7 @@ public CompletableFuture<PlcReadResponse<?>> read(PlcReadRequest readRequest) {
}

@Override
public PlcWriteRequest.Builder writeRequestBuilder() {
// TODO: Implement this ...
return null;
}

@Override
public CompletableFuture<PlcWriteResponse<?>> write(PlcWriteRequest writeRequest) {
public CompletableFuture<PlcWriteResponse> write(PlcWriteRequest writeRequest) {
CompletableFuture<InternalPlcWriteResponse> writeFuture = new CompletableFuture<>();
PlcRequestContainer<InternalPlcWriteRequest, InternalPlcWriteResponse> container =
new PlcRequestContainer<>((InternalPlcWriteRequest) writeRequest, writeFuture);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ Licensed to the Apache Software Foundation (ASF) under one
import io.jsonwebtoken.Jwts;
import io.jsonwebtoken.SignatureAlgorithm;
import org.apache.plc4x.java.PlcDriverManager;
import org.apache.plc4x.java.api.connection.PlcConnection;
import org.apache.plc4x.java.api.connection.PlcReader;
import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.messages.PlcReadRequest;
import org.apache.plc4x.java.api.messages.PlcReadResponse;
import org.eclipse.paho.client.mqttv3.*;
Expand Down Expand Up @@ -233,13 +232,11 @@ public static void main(String[] args) throws Exception {
try (PlcConnection plcConnection = new PlcDriverManager().getConnection("s7://10.10.64.20/1/1")) {
logger.info("Connected");

PlcReader plcReader = plcConnection.getReader().orElseThrow(IllegalAccessError::new);

PlcReadRequest readRequest = plcReader.readRequestBuilder().addItem("outputs", "OUTPUTS/0").build();
PlcReadRequest readRequest = plcConnection.readRequestBuilder().get().addItem("outputs", "OUTPUTS/0").build();

while (!Thread.currentThread().isInterrupted()) {

PlcReadResponse<?> plcReadResponse = plcReader.read(readRequest).get();
PlcReadResponse plcReadResponse = readRequest.execute().get();

// Refresh the connection credentials before the JWT expires.
// [START iot_mqtt_jwt_refresh]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@ Licensed to the Apache Software Foundation (ASF) under one
package org.apache.plc4x.java.examples.helloplc4x;

import org.apache.plc4x.java.PlcDriverManager;
import org.apache.plc4x.java.api.connection.PlcConnection;
import org.apache.plc4x.java.api.connection.PlcReader;
import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.messages.PlcReadRequest;
import org.apache.plc4x.java.api.messages.PlcReadResponse;
import org.apache.plc4x.java.api.types.PlcResponseCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;

public class HelloPlc4x {
Expand All @@ -49,34 +47,36 @@ public static void main(String[] args) {
// Establish a connection to the plc using the url provided as first argument
try (PlcConnection plcConnection = new PlcDriverManager().getConnection(args[0])) {

Optional<PlcReader> reader = plcConnection.getReader();

// Check if this connection support reading of data.
if (reader.isPresent()) {
PlcReader plcReader = reader.get();
if (plcConnection.readRequestBuilder().isPresent()) {

// Create a new read request:
// - Give the single item requested the alias name "value"
PlcReadRequest.Builder builder = plcReader.readRequestBuilder();
PlcReadRequest.Builder syncBuilder = plcConnection.readRequestBuilder().get();
for (int i = 1; i < args.length; i++) {
builder.addItem("value-" + i, args[i]);
syncBuilder.addItem("value-" + i, args[i]);
}
PlcReadRequest plcReadRequest = builder.build();
PlcReadRequest syncPlcReadRequest = syncBuilder.build();

//////////////////////////////////////////////////////////
// Read synchronously ...
// NOTICE: the ".get()" immediately lets this thread pause till
// the response is processed and available.
System.out.println("\nSynchronous request ...");
PlcReadResponse<?> syncResponse = plcReader.read(plcReadRequest).get();
PlcReadResponse syncResponse = syncPlcReadRequest.execute().get();
// Simply iterating over the field names returned in the response.
printResponse(syncResponse);

//////////////////////////////////////////////////////////
// Read asynchronously ...
// Register a callback executed as soon as a response arives.
PlcReadRequest.Builder asyncBuilder = plcConnection.readRequestBuilder().get();
for (int i = 1; i < args.length; i++) {
asyncBuilder.addItem("value-" + i, args[i]);
}
PlcReadRequest asyncPlcReadRequest = asyncBuilder.build();
System.out.println("\n\nAsynchronous request ...");
CompletableFuture<PlcReadResponse<?>> asyncResponse = plcReader.read(plcReadRequest);
CompletableFuture<? extends PlcReadResponse> asyncResponse = asyncPlcReadRequest.execute();
asyncResponse.whenComplete((readResponse, throwable) -> {
if (readResponse != null) {
printResponse(syncResponse);
Expand All @@ -94,7 +94,7 @@ public static void main(String[] args) {
}
}

private static void printResponse(PlcReadResponse<?> syncResponse) {
private static void printResponse(PlcReadResponse syncResponse) {
for (String fieldName : syncResponse.getFieldNames()) {
if(syncResponse.getResponseCode(fieldName) == PlcResponseCode.OK) {
int numValues = syncResponse.getNumberOfValues(fieldName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ private void run() throws PlcException {
PlcReadRequest readRequest = builder.build();

// Create a supplier that is able to read the batch we just created.
Supplier<PlcReadResponse<?>> plcSupplier = PlcFunctions.batchSupplier(plcAdapter, readRequest);
Supplier<PlcReadResponse> plcSupplier = PlcFunctions.batchSupplier(plcAdapter, readRequest);

// Start polling our plc source in the given interval.
TStream<PlcReadResponse<?>> source = top.poll(plcSupplier, config.getPollingInterval(), TimeUnit.MILLISECONDS);
TStream<PlcReadResponse> source = top.poll(plcSupplier, config.getPollingInterval(), TimeUnit.MILLISECONDS);

// Convert the byte into a string.
TStream<String> jsonSource = source.map(value -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ Licensed to the Apache Software Foundation (ASF) under one
*/
package org.apache.plc4x.java.examples.plclogger;

import java.util.concurrent.TimeUnit;

import org.apache.edgent.function.Supplier;
import org.apache.edgent.providers.direct.DirectProvider;
import org.apache.edgent.topology.TStream;
import org.apache.edgent.topology.Topology;
import org.apache.plc4x.edgent.PlcConnectionAdapter;
import org.apache.plc4x.edgent.PlcFunctions;

import java.util.concurrent.TimeUnit;

public class PlcLogger {

public static void main(String[] args) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,9 @@ Licensed to the Apache Software Foundation (ASF) under one
import org.apache.camel.support.LoggingExceptionHandler;
import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.AsyncProcessorConverterHelper;
import org.apache.plc4x.java.api.connection.PlcConnection;
import org.apache.plc4x.java.api.connection.PlcSubscriber;
import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.exceptions.PlcException;
import org.apache.plc4x.java.api.messages.PlcSubscriptionEvent;
import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
import org.apache.plc4x.java.api.messages.PlcUnsubscriptionResponse;
import org.apache.plc4x.java.api.messages.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -52,7 +48,6 @@ public class Plc4XConsumer extends ServiceSupport implements Consumer, java.util
private Class<?> dataType;
private PlcSubscriptionResponse subscriptionResponse;


public Plc4XConsumer(Plc4XEndpoint endpoint, Processor processor) throws PlcException {
this.endpoint = endpoint;
this.dataType = endpoint.getDataType();
Expand Down Expand Up @@ -83,19 +78,19 @@ public void setExceptionHandler(ExceptionHandler exceptionHandler) {

@Override
protected void doStart() throws InterruptedException, ExecutionException, PlcException {
PlcSubscriber plcSubscriber = plcConnection.getSubscriber().orElseThrow(
() -> new PlcException("Connection doesn't support subscriptions."));
// TODO: Is it correct to only support one field?
PlcSubscriptionRequest request = plcSubscriber.subscriptionRequestBuilder()
PlcSubscriptionRequest request = plcConnection.subscriptionRequestBuilder().get()
.addCyclicField("default", fieldQuery, Duration.of(3, ChronoUnit.SECONDS)).build();
plcSubscriber.register(request, this);
subscriptionResponse = request.execute().get();
// TODO: we need to return the plcSubscriptionResponse here too as we need this to unsubscribe...
// TODO: figure out what to do with this
// plcSubscriber.register(this, plcSubscriptionResponse.getSubscriptionHandles());
}

@Override
protected void doStop() throws InterruptedException, ExecutionException, TimeoutException, PlcException {
PlcSubscriber plcSubscriber = plcConnection.getSubscriber().orElseThrow(
() -> new PlcException("Connection doesn't support subscriptions."));
CompletableFuture<PlcUnsubscriptionResponse> unsubscriptionFuture = plcSubscriber.unsubscribe(builder -> builder.addHandles(subscriptionResponse.getSubscriptionHandles()));
PlcUnsubscriptionRequest request = plcConnection.unsubscriptionRequestBuilder().get().addHandles(subscriptionResponse.getSubscriptionHandles()).build();
CompletableFuture<? extends PlcUnsubscriptionResponse> unsubscriptionFuture = request.execute();
PlcUnsubscriptionResponse unsubscriptionResponse = unsubscriptionFuture.get(5, TimeUnit.SECONDS);
// TODO: Handle the response ...
try {
Expand All @@ -105,10 +100,6 @@ protected void doStop() throws InterruptedException, ExecutionException, Timeout
}
}

private PlcSubscriber getSubscriber() {
return plcConnection.getSubscriber().orElseThrow(() -> new RuntimeException("No subscriber available"));
}

@Override
public void accept(PlcSubscriptionEvent plcSubscriptionEvent) {
LOGGER.debug("Received {}", plcSubscriptionEvent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ Licensed to the Apache Software Foundation (ASF) under one
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.support.LoggingExceptionHandler;
import org.apache.camel.support.ServiceSupport;
import org.apache.plc4x.java.api.connection.PlcConnection;
import org.apache.plc4x.java.api.connection.PlcReader;
import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.exceptions.PlcException;
import org.apache.plc4x.java.api.messages.PlcReadRequest;
import org.apache.plc4x.java.api.messages.PlcReadResponse;
Expand All @@ -44,8 +43,7 @@ public class Plc4XPollingConsumer extends ServiceSupport implements PollingConsu
private Plc4XEndpoint endpoint;
private ExceptionHandler exceptionHandler;
private PlcConnection plcConnection;
private PlcReader plcReader;
private PlcReadRequest readRequest;
private PlcReadRequest.Builder requestBuilder;
private Class dataType;

public Plc4XPollingConsumer(Plc4XEndpoint endpoint) throws PlcException {
Expand All @@ -54,8 +52,7 @@ public Plc4XPollingConsumer(Plc4XEndpoint endpoint) throws PlcException {
this.exceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), getClass());
String plc4xURI = endpoint.getEndpointUri().replaceFirst("plc4x:/?/?", "");
this.plcConnection = endpoint.getPlcDriverManager().getConnection(plc4xURI);
this.plcReader = plcConnection.getReader().orElseThrow(() -> new PlcException("This connection doesn't support reading."));
readRequest = plcReader.readRequestBuilder().addItem("default", endpoint.getAddress()).build();
this.requestBuilder = plcConnection.readRequestBuilder().orElseThrow(() -> new PlcException("This connection doesn't support reading."));
}

@Override
Expand All @@ -79,7 +76,7 @@ public void setExceptionHandler(ExceptionHandler exceptionHandler) {
@Override
public Exchange receive() {
Exchange exchange = endpoint.createExchange();
CompletableFuture<? extends PlcReadResponse> read = plcReader.read(readRequest);
CompletableFuture<? extends PlcReadResponse> read = createReadRequest().execute();
try {
PlcReadResponse plcReadResponse = read.get();
exchange.getIn().setBody(unwrapIfSingle(plcReadResponse.getAllObjects("default")));
Expand All @@ -97,7 +94,7 @@ public Exchange receiveNoWait() {
@Override
public Exchange receive(long timeout) {
Exchange exchange = endpoint.createExchange();
CompletableFuture<? extends PlcReadResponse> read = plcReader.read(readRequest);
CompletableFuture<? extends PlcReadResponse> read = createReadRequest().execute();
try {
PlcReadResponse plcReadResponse = read.get(timeout, TimeUnit.MILLISECONDS);
exchange.getIn().setBody(unwrapIfSingle(plcReadResponse.getAllObjects("default")));
Expand All @@ -121,6 +118,10 @@ protected void doStop() {
}
}

private PlcReadRequest createReadRequest() {
return requestBuilder.addItem("default", endpoint.getAddress()).build();
}

private Object unwrapIfSingle(Collection collection) {
if (collection.isEmpty()) {
return null;
Expand Down