Skip to content

Commit

Permalink
fix(plc4j/connection-cache): Implemented Writing for the connection c…
Browse files Browse the repository at this point in the history
…ache
  • Loading branch information
chrisdutz committed Feb 4, 2022
1 parent 3dfdc12 commit 3f92555
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,7 @@
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
import org.apache.plc4x.java.api.exceptions.PlcUnsupportedOperationException;
import org.apache.plc4x.java.api.messages.PlcReadRequest;
import org.apache.plc4x.java.api.messages.PlcReadResponse;
import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
import org.apache.plc4x.java.api.messages.PlcUnsubscriptionRequest;
import org.apache.plc4x.java.api.messages.PlcWriteRequest;
import org.apache.plc4x.java.api.messages.*;
import org.apache.plc4x.java.api.metadata.PlcConnectionMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -72,7 +68,7 @@ public boolean isConnected() {
}
}

private CompletableFuture<? extends PlcReadResponse> wrapWithTimeout(CompletableFuture<? extends PlcReadResponse> future, long timeoutMillis) {
private CompletableFuture<? extends PlcReadResponse> wrapReadWithTimeout(CompletableFuture<? extends PlcReadResponse> future, long timeoutMillis) {
//schedule watcher
final CompletableFuture<PlcReadResponse> responseFuture = new CompletableFuture<>();
schedulerExecutor.schedule(() -> {
Expand Down Expand Up @@ -100,6 +96,34 @@ public Object apply(PlcReadResponse plcReadResponse, Throwable throwable) {
return responseFuture;
}

private CompletableFuture<? extends PlcWriteResponse> wrapWriteWithTimeout(CompletableFuture<? extends PlcWriteResponse> future, long timeoutMillis) {
//schedule watcher
final CompletableFuture<PlcWriteResponse> responseFuture = new CompletableFuture<>();
schedulerExecutor.schedule(() -> {
if (!future.isDone()) {
logger.debug("Timing out the PLC request!");
future.cancel(true);
responseFuture.completeExceptionally(new TimeoutException("Response did not finish in Time!"));
} else {
logger.trace("Unnecessary to cancel the request!");
}
}, timeoutMillis, TimeUnit.MILLISECONDS);
future.handle(new BiFunction<PlcWriteResponse, Throwable, Object>() {
@Override
public Object apply(PlcWriteResponse plcWriteResponse, Throwable throwable) {
if (plcWriteResponse != null) {
logger.debug("Request finsihed successfull!");
responseFuture.complete(plcWriteResponse);
} else {
logger.debug("Request failed", throwable);
responseFuture.completeExceptionally(throwable);
}
return null;
}
});
return responseFuture;
}

/**
* Executes the Request.
*/
Expand All @@ -110,7 +134,7 @@ public CompletableFuture<? extends PlcReadResponse> execute(PlcReadRequest reque
}
try {
logger.trace("Executing Request {}", request);
final CompletableFuture<? extends PlcReadResponse> responseFuture = wrapWithTimeout(request.execute(), 5_000);
final CompletableFuture<? extends PlcReadResponse> responseFuture = wrapReadWithTimeout(request.execute(), 5_000);
// final CompletableFuture<? extends PlcReadResponse> responseFuture = request.execute();
// The following code handles the case, that a read fails (which is handled async and thus not really connected
// to the connection, yet
Expand All @@ -128,11 +152,40 @@ public PlcReadResponse apply(PlcReadResponse plcReadResponse, Throwable throwabl
});
return handledResponseFuture;
} catch (Exception e) {
return closeConnectionExceptionally(e);
return (CompletableFuture<? extends PlcReadResponse>) closeConnectionExceptionally(e);
}
}

public CompletableFuture<? extends PlcWriteResponse> execute(PlcWriteRequest request) {
logger.trace("Trying to executing Request {}", request);
if (closed) {
throw new IllegalStateException("Trying to execute a Request on a closed Connection!");
}
try {
logger.trace("Executing Request {}", request);
final CompletableFuture<? extends PlcWriteResponse> responseFuture = wrapWriteWithTimeout(request.execute(), 5_000);
// final CompletableFuture<? extends PlcReadResponse> responseFuture = request.execute();
// The following code handles the case, that a read fails (which is handled async and thus not really connected
// to the connection, yet
// Thus, we register our own listener who gets the notification and reports the connection as broken
final CompletableFuture<PlcWriteResponse> handledResponseFuture = responseFuture.handleAsync(new BiFunction<PlcWriteResponse, Throwable, PlcWriteResponse>() {
@Override
public PlcWriteResponse apply(PlcWriteResponse plcWriteResponse, Throwable throwable) {
if (throwable != null) {
// Do something here...
logger.warn("Request finished with exception. Reporting Connection as Broken", throwable);
closeConnectionExceptionally(null);
}
return plcWriteResponse;
}
});
return handledResponseFuture;
} catch (Exception e) {
return (CompletableFuture<? extends PlcWriteResponse>) closeConnectionExceptionally(e);
}
}

private CompletableFuture<? extends PlcReadResponse> closeConnectionExceptionally(Exception e) {
private CompletableFuture<? extends PlcResponse> closeConnectionExceptionally(Exception e) {
// First, close this connection and allow no further operations on it!
this.closed = true;
// Return the Connection as invalid
Expand Down Expand Up @@ -184,7 +237,10 @@ public PlcReadRequest.Builder readRequestBuilder() {

@Override
public PlcWriteRequest.Builder writeRequestBuilder() {
return null;
if (closed) {
throw new IllegalStateException("Trying to build a Request on a closed Connection!");
}
return new CachedWriteRequestBuilder(this, this.getActiveConnection().writeRequestBuilder());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.plc4x.java.utils.connectionpool2;

import org.apache.plc4x.java.api.messages.PlcReadRequest;
import org.apache.plc4x.java.api.messages.PlcReadResponse;
import org.apache.plc4x.java.api.messages.PlcWriteRequest;
import org.apache.plc4x.java.api.messages.PlcWriteResponse;
import org.apache.plc4x.java.api.model.PlcField;
import org.apache.plc4x.java.api.value.PlcValue;

import java.util.LinkedHashSet;
import java.util.List;
import java.util.concurrent.CompletableFuture;

public class CachedWriteRequest implements PlcWriteRequest {

private final CachedPlcConnection parent;
private final PlcWriteRequest innerRequest;

public CachedWriteRequest(CachedPlcConnection parent, PlcWriteRequest innerRequest) {
this.parent = parent;
this.innerRequest = innerRequest;
}

@Override
public CompletableFuture<? extends PlcWriteResponse> execute() {
// Only allowed if connection is still active
return parent.execute(innerRequest);
}

@Override
public int getNumberOfFields() {
return innerRequest.getNumberOfFields();
}

@Override
public LinkedHashSet<String> getFieldNames() {
return innerRequest.getFieldNames();
}

@Override
public PlcField getField(String s) {
return innerRequest.getField(s);
}

@Override
public List<PlcField> getFields() {
return innerRequest.getFields();
}

@Override
public int getNumberOfValues(String name) {
return innerRequest.getNumberOfValues(name);
}

@Override
public PlcValue getPlcValue(String name) {
return innerRequest.getPlcValue(name);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.plc4x.java.utils.connectionpool2;

import org.apache.plc4x.java.api.messages.PlcReadRequest;
import org.apache.plc4x.java.api.messages.PlcWriteRequest;
import org.apache.plc4x.java.api.model.PlcField;

public class CachedWriteRequestBuilder implements PlcWriteRequest.Builder {

private final CachedPlcConnection parent;
private final PlcWriteRequest.Builder builder;

public CachedWriteRequestBuilder(CachedPlcConnection parent, PlcWriteRequest.Builder builder) {
this.parent = parent;
this.builder = builder;
}

@Override
public PlcWriteRequest.Builder addItem(String name, String fieldQuery, Object... values) {
builder.addItem(name, fieldQuery, values);
return this;
}

@Override
public PlcWriteRequest build() {
return new CachedWriteRequest(parent, builder.build());
}

}

0 comments on commit 3f92555

Please sign in to comment.