Permalink
Browse files

Scheduler, ProcessChain & HTTP streaming

  • Loading branch information...
hendrikebbers committed Apr 26, 2018
1 parent dcf0418 commit 618421cd990d512c6b8601dd39189706f1daffe7
Showing with 995 additions and 57 deletions.
  1. +15 −0 ...le/projector-sample-client/src/main/java/com/canoo/projection/sample/client/ProjectionClient.java
  2. +15 −0 ...le/projector-sample-server/src/main/java/com/canoo/projection/sample/server/ProjectionServer.java
  3. +15 −0 ...mple-server/src/main/java/com/canoo/projection/sample/server/controller/SimpleFormController.java
  4. +15 −0 ...or-sample/projector-sample-server/src/main/java/com/canoo/projection/sample/server/data/User.java
  5. +15 −0 ...le/projector-sample-server/src/main/java/com/canoo/projection/sample/server/data/UserService.java
  6. +193 −0 ...-platform-client/src/main/java/com/canoo/dp/impl/platform/client/concurrent/ProcessChainImpl.java
  7. +32 −0 ...tform-client/src/main/java/com/canoo/dp/impl/platform/client/concurrent/ProcessChainProvider.java
  8. +40 −0 ...latform-client/src/main/java/com/canoo/dp/impl/platform/client/concurrent/ProcessDescription.java
  9. +56 −0 ...hin-platform-client/src/main/java/com/canoo/dp/impl/platform/client/concurrent/ScheduledTask.java
  10. +28 −0 ...rm-client/src/main/java/com/canoo/dp/impl/platform/client/concurrent/ScheduledTaskComparator.java
  11. +117 −0 ...hin-platform-client/src/main/java/com/canoo/dp/impl/platform/client/concurrent/SchedulerImpl.java
  12. +37 −0 ...platform-client/src/main/java/com/canoo/dp/impl/platform/client/concurrent/SchedulerProvider.java
  13. +50 −0 ...in-platform-client/src/main/java/com/canoo/dp/impl/platform/client/concurrent/TaskResultImpl.java
  14. +23 −0 ...olphin-platform-client/src/main/java/com/canoo/dp/impl/platform/client/concurrent/ThreadType.java
  15. +24 −4 ...form-client/src/main/java/com/canoo/dp/impl/platform/client/http/HttpCallResponseBuilderImpl.java
  16. +6 −7 ...olphin-platform-client/src/main/java/com/canoo/dp/impl/platform/client/http/HttpResponseImpl.java
  17. +52 −0 ...form/dolphin-platform-client/src/main/java/com/canoo/platform/client/concurrent/ProcessChain.java
  18. +3 −1 ...latform-client/src/main/resources/META-INF/services/com.canoo.platform.client.spi.ServiceProvider
  19. +5 −7 ...dolphin-platform-client/src/test/java/com/canoo/dp/impl/platform/client/http/HttpClientTests.java
  20. +2 −0 platform/dolphin-platform-core/src/main/java/com/canoo/dp/impl/platform/core/PlatformConstants.java
  21. +15 −7 platform/dolphin-platform-core/src/main/java/com/canoo/dp/impl/platform/core/PlatformVersion.java
  22. +1 −1 ...atform-core/src/main/java/com/canoo/dp/impl/platform/core/SimpleDolphinPlatformThreadFactory.java
  23. +50 −0 ...dolphin-platform-core/src/main/java/com/canoo/dp/impl/platform/core/concurrent/SimpleTrigger.java
  24. +79 −26 ...orm/dolphin-platform-core/src/main/java/com/canoo/dp/impl/platform/core/http/ConnectionUtils.java
  25. +9 −0 ...olphin-platform-core/src/main/java/com/canoo/dp/impl/platform/core/http/HttpClientConnection.java
  26. +2 −0 ...dolphin-platform-core/src/main/java/com/canoo/dp/impl/platform/core/http/HttpHeaderConstants.java
  27. +1 −1 ...in-platform-core/src/main/java/com/canoo/platform/core/{ → concurrent}/PlatformThreadFactory.java
  28. +24 −0 platform/dolphin-platform-core/src/main/java/com/canoo/platform/core/concurrent/Scheduler.java
  29. +27 −0 platform/dolphin-platform-core/src/main/java/com/canoo/platform/core/concurrent/TaskResult.java
  30. +37 −0 platform/dolphin-platform-core/src/main/java/com/canoo/platform/core/concurrent/Trigger.java
  31. +4 −0 ...orm/dolphin-platform-core/src/main/java/com/canoo/platform/core/http/HttpCallResponseBuilder.java
  32. +1 −1 platform/dolphin-platform-core/src/main/java/com/canoo/platform/core/http/HttpResponse.java
  33. +1 −1 ...m/dolphin-platform-server/src/main/java/com/canoo/dp/impl/server/bootstrap/PlatformBootstrap.java
  34. +1 −1 ...in-platform-server/src/main/java/com/canoo/dp/impl/server/bootstrap/ServerCoreComponentsImpl.java
@@ -1,3 +1,18 @@
/*
* Copyright 2015-2018 Canoo Engineering AG.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.canoo.projection.sample.client;
import com.canoo.dp.impl.platform.projector.client.base.ClientActionSupport;
@@ -1,3 +1,18 @@
/*
* Copyright 2015-2018 Canoo Engineering AG.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.canoo.projection.sample.server;
import com.canoo.platform.remoting.server.spring.EnableRemoting;
@@ -1,3 +1,18 @@
/*
* Copyright 2015-2018 Canoo Engineering AG.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.canoo.projection.sample.server.controller;
import com.canoo.dp.impl.platform.projector.action.DefaultServerActionBean;
@@ -1,3 +1,18 @@
/*
* Copyright 2015-2018 Canoo Engineering AG.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.canoo.projection.sample.server.data;
import com.canoo.dp.impl.platform.data.EntityWithId;
@@ -1,3 +1,18 @@
/*
* Copyright 2015-2018 Canoo Engineering AG.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.canoo.projection.sample.server.data;
import com.canoo.dp.impl.platform.data.CrudService;
@@ -0,0 +1,193 @@
/*
* Copyright 2015-2018 Canoo Engineering AG.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.canoo.dp.impl.platform.client.concurrent;
import com.canoo.dp.impl.platform.core.Assert;
import com.canoo.platform.client.ClientConfiguration;
import com.canoo.platform.client.concurrent.ProcessChain;
import javafx.concurrent.Task;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
public class ProcessChainImpl<T> implements ProcessChain<T> {
private final List<ProcessDescription<?, ?>> processes;
private final Executor backgroundExecutor;
private final Executor uiExecutor;
private final Consumer<Throwable> exceptionConsumer;
private final Runnable finalRunnable;
private ProcessChainImpl(final Executor backgroundExecutor, final Executor uiExecutor, final List<ProcessDescription<?, ?>> processes, final Consumer<Throwable> exceptionConsumer, final Runnable finalRunnable) {
this.backgroundExecutor = Assert.requireNonNull(backgroundExecutor, "backgroundExecutor");
this.uiExecutor = Assert.requireNonNull(uiExecutor, "uiExecutor");
Assert.requireNonNull(processes, "processes");
this.processes = new ArrayList<>(processes);
this.exceptionConsumer = exceptionConsumer;
this.finalRunnable = finalRunnable;
}
public ProcessChainImpl(final ClientConfiguration clientConfiguration) {
Assert.requireNonNull(clientConfiguration, "clientConfiguration");
this.backgroundExecutor = clientConfiguration.getBackgroundExecutor();
this.uiExecutor = clientConfiguration.getUiExecutor();
this.processes = new ArrayList<>();
this.exceptionConsumer = null;
this.finalRunnable = null;
}
@Override
public <V> ProcessChain<V> addUiFunction(final Function<T, V> function) {
return addProcessDescription(new ProcessDescription<T, V>(function, ThreadType.PLATFORM));
}
@Override
public <V> ProcessChain<V> addBackgroundFunction(final Function<T, V> function) {
return addProcessDescription(new ProcessDescription<T, V>(function, ThreadType.EXECUTOR));
}
@Override
public ProcessChain<Void> addUiRunnable(final Runnable runnable) {
return addRunnable(runnable, ThreadType.PLATFORM);
}
@Override
public ProcessChain<Void> addBackgroundRunnable(final Runnable runnable) {
return addRunnable(runnable, ThreadType.EXECUTOR);
}
@Override
public ProcessChain<Void> addUiConsumer(final Consumer<T> consumer) {
return addConsumer(consumer, ThreadType.PLATFORM);
}
@Override
public ProcessChain<Void> addBackgroundConsumer(final Consumer<T> consumer) {
return addConsumer(consumer, ThreadType.EXECUTOR);
}
@Override
public <V> ProcessChain<V> addUiSupplier(final Supplier<V> supplier) {
return addSupplier(supplier, ThreadType.PLATFORM);
}
@Override
public <V> ProcessChain<V> addBackgroundSupplier(final Supplier<V> supplier) {
return addSupplier(supplier, ThreadType.EXECUTOR);
}
private ProcessChain<Void> addRunnable(final Runnable runnable, final ThreadType type) {
Assert.requireNonNull(runnable, "runnable");
return addProcessDescription(new ProcessDescription<T, Void>(e -> {
runnable.run();
return null;
}, type));
}
private ProcessChain<Void> addConsumer(final Consumer<T> consumer, final ThreadType type) {
Assert.requireNonNull(consumer, "consumer");
return addProcessDescription(new ProcessDescription<T, Void>(e -> {
consumer.accept(e);
return null;
}, type));
}
private <V> ProcessChain<V> addSupplier(final Supplier<V> supplier, final ThreadType type) {
Assert.requireNonNull(supplier, "supplier");
return addProcessDescription(new ProcessDescription<T, V>(e -> supplier.get(), type));
}
private <V> ProcessChain<V> addProcessDescription(final ProcessDescription<T, V> processDescription) {
Assert.requireNonNull(processDescription, "processDescription");
processes.add(processDescription);
return new ProcessChainImpl<V>(backgroundExecutor, uiExecutor, processes, exceptionConsumer, finalRunnable);
}
@Override
public ProcessChain<T> onException(final Consumer<Throwable> exceptionConsumer) {
return new ProcessChainImpl<T>(backgroundExecutor, uiExecutor, processes, exceptionConsumer, finalRunnable);
}
@Override
public ProcessChain<T> withUiFinal(final Runnable finalRunnable) {
return new ProcessChainImpl<T>(backgroundExecutor, uiExecutor, processes, exceptionConsumer, finalRunnable);
}
private <U, V> V execute(final U inputParameter, final ProcessDescription<U, V> processDescription) throws InterruptedException, ExecutionException {
Assert.requireNonNull(processDescription, "processDescription");
if (Objects.equals(processDescription.getThreadType(), ThreadType.EXECUTOR)) {
return processDescription.getFunction().apply(inputParameter);
} else {
final CompletableFuture<V> futureResult = new CompletableFuture<>();
uiExecutor.execute(() -> {
final V result = processDescription.getFunction().apply(inputParameter);
futureResult.complete(result);
});
return futureResult.get();
}
}
@Override
public Task<T> run() {
final Task<T> task = new Task<T>() {
@Override
protected T call() throws Exception {
try {
Object lastResult = null;
for (final ProcessDescription<?, ?> processDescription : processes) {
lastResult = execute(lastResult, (ProcessDescription<Object, ?>) processDescription);
}
return (T) lastResult;
} catch (final Exception e) {
if (exceptionConsumer != null) {
final CompletableFuture<Void> futureResult = new CompletableFuture<>();
uiExecutor.execute(() -> {
exceptionConsumer.accept(e);
futureResult.complete(null);
});
futureResult.get();
}
throw e;
} finally {
if (finalRunnable != null) {
final CompletableFuture<Void> futureResult = new CompletableFuture<>();
uiExecutor.execute(() -> {
finalRunnable.run();
futureResult.complete(null);
});
futureResult.get();
}
}
}
};
backgroundExecutor.execute(task);
return task;
}
}
@@ -0,0 +1,32 @@
/*
* Copyright 2015-2018 Canoo Engineering AG.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.canoo.dp.impl.platform.client.concurrent;
import com.canoo.dp.impl.platform.client.AbstractServiceProvider;
import com.canoo.platform.client.ClientConfiguration;
import com.canoo.platform.client.concurrent.ProcessChain;
public class ProcessChainProvider extends AbstractServiceProvider<ProcessChain> {
public ProcessChainProvider() {
super(ProcessChain.class);
}
@Override
protected ProcessChain createService(final ClientConfiguration configuration) {
return new ProcessChainImpl(configuration);
}
}
@@ -0,0 +1,40 @@
/*
* Copyright 2015-2018 Canoo Engineering AG.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.canoo.dp.impl.platform.client.concurrent;
import com.canoo.dp.impl.platform.core.Assert;
import java.util.function.Function;
public class ProcessDescription<V, T> {
private final Function<V, T> function;
private final ThreadType threadType;
public ProcessDescription(final Function<V, T> function, final ThreadType threadType) {
this.function = Assert.requireNonNull(function, "function");
this.threadType = Assert.requireNonNull(threadType, "threadType");
}
public Function<V, T> getFunction() {
return function;
}
public ThreadType getThreadType() {
return threadType;
}
}
Oops, something went wrong.

0 comments on commit 618421c

Please sign in to comment.