Skip to content

Commit

Permalink
Updates HttpBridge
Browse files Browse the repository at this point in the history
  • Loading branch information
Sn0w3y committed Jun 16, 2024
1 parent c269d58 commit f4f8885
Show file tree
Hide file tree
Showing 11 changed files with 792 additions and 0 deletions.
7 changes: 7 additions & 0 deletions io.openems.edge.bridge.http/.checkstyle
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>

<fileset-config file-format-version="1.2.0" simple-config="true" sync-formatter="false">
<fileset name="all" enabled="true" check-config-name="OpenEMS" local="false">
<file-match-pattern match-pattern="." include-pattern="true"/>
</fileset>
</fileset-config>
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package io.openems.edge.bridge.http;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.ServiceScope;

import io.openems.common.utils.ThreadPoolUtils;
import io.openems.edge.bridge.http.api.BridgeHttpExecutor;
import io.openems.edge.bridge.http.time.DelayTimeProvider.Delay;

@Component(scope = ServiceScope.PROTOTYPE)
public class AsyncBridgeHttpExecutor implements BridgeHttpExecutor {

// TODO change to java 21 virtual threads
private final ScheduledExecutorService pool = Executors.newScheduledThreadPool(0);

@Override
public ScheduledFuture<?> schedule(Runnable task, Delay.DurationDelay durationDelay) {
return this.pool.schedule(task, durationDelay.getDuration().toMillis(), TimeUnit.MILLISECONDS);
}

@Override
public void execute(Runnable task) {
this.pool.execute(task);
}

@Override
public boolean isShutdown() {
return this.pool.isShutdown();
}

@Deactivate
private void deactivate() {
ThreadPoolUtils.shutdownAndAwaitTermination(this.pool, 0);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package io.openems.edge.bridge.http;

import static java.util.stream.Collectors.joining;

import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URL;

import org.osgi.service.component.annotations.Component;

import io.openems.common.types.HttpStatus;
import io.openems.edge.bridge.http.api.BridgeHttp.Endpoint;
import io.openems.edge.bridge.http.api.EndpointFetcher;
import io.openems.edge.bridge.http.api.HttpError;
import io.openems.edge.bridge.http.api.HttpResponse;

@Component
public class NetworkEndpointFetcher implements EndpointFetcher {

@Override
public HttpResponse<String> fetchEndpoint(final Endpoint endpoint) throws HttpError {
try {
var url = URI.create(endpoint.url()).toURL();
var con = (HttpURLConnection) url.openConnection();
con.setRequestMethod(endpoint.method().name());
con.setConnectTimeout(endpoint.connectTimeout());
con.setReadTimeout(endpoint.readTimeout());

endpoint.properties().forEach(con::setRequestProperty);

if (endpoint.method().isBodyAllowed() && endpoint.body() != null) {
con.setDoOutput(true);
try (var os = con.getOutputStream(); //
var osw = new OutputStreamWriter(os, "UTF-8")) {
osw.write(endpoint.body());
osw.flush();
}
}

final var status = HttpStatus.fromCodeOrCustom(con.getResponseCode(), con.getResponseMessage());

String body;
try (var in = new BufferedReader(new InputStreamReader(con.getInputStream()))) {
// Read HTTP response
body = in.lines().collect(joining(System.lineSeparator()));
} catch (IOException e) {
throw new HttpError.ResponseError(status, null);
}

if (status.isError()) {
throw new HttpError.ResponseError(status, body);
}
return new HttpResponse<>(status, body);
} catch (IOException e) {
throw new HttpError.UnknownError(e);
}
}

@Override
public byte[] fetchEndpointRaw(Endpoint endpoint) throws HttpError {
try {
URL url = new URL(endpoint.url());
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod(endpoint.method().name());
// Set request properties
endpoint.properties().forEach(connection::setRequestProperty);
// Handle request body for POST and PUT methods
if ("POST".equals(endpoint.method().name()) || "PUT".equals(endpoint.method().name())) {
connection.setDoOutput(true);
try (OutputStream os = connection.getOutputStream()) {
byte[] input = endpoint.body().getBytes("utf-8");
os.write(input, 0, input.length);
}
}
// Read response
try (InputStream in = connection.getInputStream()) {
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
int nRead;
byte[] data = new byte[1024];
while ((nRead = in.read(data, 0, data.length)) != -1) {
buffer.write(data, 0, nRead);
}
buffer.flush();
return buffer.toByteArray();
} finally {
connection.disconnect();
}
} catch (IOException e) {
throw new HttpError.UnknownError(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.openems.edge.bridge.http.api;

import java.util.concurrent.ScheduledFuture;

import io.openems.edge.bridge.http.time.DelayTimeProvider.Delay;

/**
* Executor to handle tasks created by a {@link BridgeHttp}.
*/
public interface BridgeHttpExecutor {

/**
* Schedules a task to be executed from now plus the given delay.
*
* @param task the task to execute
* @param durationDelay the delay to schedule toe task
* @return a {@link ScheduledFuture}
*/
public ScheduledFuture<?> schedule(Runnable task, Delay.DurationDelay durationDelay);

/**
* Executes the given task.
*
* @param task the task to execute
*/
public void execute(Runnable task);

/**
* Determines if this executor is shutdown.
*
* @return true if this executor is shutdown else false
*/
public boolean isShutdown();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package io.openems.edge.bridge.http.api;

import java.util.HashSet;
import java.util.Set;
import java.util.function.Consumer;

import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.ServiceScope;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventHandler;
import org.osgi.service.event.propertytypes.EventTopics;

import io.openems.edge.common.event.EdgeEventConstants;

@Component(//
scope = ServiceScope.SINGLETON, //
service = { CycleSubscriber.class, EventHandler.class } //
)
@EventTopics({ //
EdgeEventConstants.TOPIC_CYCLE_BEFORE_PROCESS_IMAGE //
})
public class CycleSubscriber implements EventHandler {

private final Set<Consumer<Event>> eventHandler = new HashSet<>();

@Override
public void handleEvent(Event event) {
switch (event.getTopic()) {
case EdgeEventConstants.TOPIC_CYCLE_BEFORE_PROCESS_IMAGE -> {
synchronized (this.eventHandler) {
this.eventHandler.forEach(t -> t.accept(event));
}
}
}
}

/**
* Subscribes to the events of the topics this component is subscribed to.
*
* @param eventHandler the handler to execute on every event
*/
public void subscribe(Consumer<Event> eventHandler) {
synchronized (this.eventHandler) {
this.eventHandler.add(eventHandler);
}
}

/**
* Unsubscribes a event handler.
*
* @param eventHandler the handler to remove
* @return true if the handler was successfully removed; if the handler was not
* found returs false
*/
public boolean unsubscribe(Consumer<Event> eventHandler) {
synchronized (this.eventHandler) {
return this.eventHandler.remove(eventHandler);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.openems.edge.bridge.http.api;

import io.openems.common.exceptions.OpenemsError.OpenemsNamedException;
import io.openems.edge.bridge.http.api.BridgeHttp.Endpoint;

public interface EndpointFetcher {

/**
* Creates a {@link Runnable} to execute a request with the given parameters.
*
* @param endpoint the {@link Endpoint} to fetch
*
* @return the result of the {@link Endpoint}
* @throws OpenemsNamedException on error
*/
public HttpResponse<String> fetchEndpoint(Endpoint endpoint) throws HttpError;

/**
* Executes an HTTP request for the given endpoint and returns the raw response
* body as a byte array.
*
* @param endpoint the {@link Endpoint} to fetch
* @return the raw response body as a byte array
* @throws OpenemsNamedException on error
*/
public byte[] fetchEndpointRaw(Endpoint endpoint) throws HttpError;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package io.openems.edge.bridge.http.api;

import io.openems.common.types.HttpStatus;

public abstract sealed class HttpError extends Exception {

private static final long serialVersionUID = 6341345161164781738L;

private HttpError(String message) {
super(message);
}

private HttpError(Throwable cause) {
super(cause);
}

public static final class ResponseError extends HttpError {

private static final long serialVersionUID = -5382307294288467972L;

/**
* Creates a {@link HttpError#ResponseError} for a not found error. The
* predefined values are for status "404" and message "Not Found".
*
* @return the error
*/
public static ResponseError notFound() {
return new ResponseError(HttpStatus.NOT_FOUND, null);
}

public final HttpStatus status;
public final String body;

public ResponseError(HttpStatus status, String body) {
super("Http " + status + (body != null ? ", Body=" + body : ""));
this.status = status;
this.body = body;
}

@Override
public String toString() {
return "ResponseError [status=" + this.status + ", body=" + this.body + "]";
}

}

public static final class UnknownError extends HttpError {

private static final long serialVersionUID = 5683236662459434998L;

public UnknownError(Throwable cause) {
super(cause);
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package io.openems.edge.bridge.http.api;

import io.openems.common.types.HttpStatus;

public record HttpResponse<T>(//
HttpStatus status, //
T data //
) {

/**
* Creates a successful response with predefined values status 200 and message
* "OK".
*
* @param <T> the type of the result data
* @param data the data (body) of the response
* @return the created {@link HttpResponse}
*/
public static <T> HttpResponse<T> ok(T data) {
return new HttpResponse<T>(HttpStatus.OK, data);
}

/**
* Creates a new {@link HttpResponse} with the given data set and all other
* fields from the current instance passed to the created object.
*
* @param <O> the type of the new data
* @param newData the new data to set
* @return the new {@link HttpResponse} object
*/
public <O> HttpResponse<O> withData(O newData) {
return new HttpResponse<O>(this.status(), newData);
}

}
Loading

0 comments on commit f4f8885

Please sign in to comment.