From 6f8e76376a573d50583bbdaf84003c987ff7098d Mon Sep 17 00:00:00 2001 From: Dominik Riemer Date: Sun, 23 Jul 2023 15:38:57 +0200 Subject: [PATCH] Improve monitoring of adapters and pipelines (#1787) --- .../context/IAdapterRuntimeContext.java | 4 +- .../api/monitoring/FixedSizeList.java | 49 +++++++----- ...mentLogger.java => IExtensionsLogger.java} | 17 +++- .../api/monitoring/SpMonitoringManager.java | 33 ++++---- .../api/pe/context/RuntimeContext.java | 4 +- .../connect/AdapterWorkerManagement.java | 17 ++-- .../connect/PullAdapterScheduler.java | 4 +- .../elements/SendToBrokerAdapterSink.java | 7 +- .../context/AdapterContextGenerator.java | 9 ++- .../context/SpAdapterRuntimeContext.java | 12 +-- .../monitoring/ExtensionsLogger.java | 63 +++++++++++++++ .../connect/AdapterWorkerManagementTest.java | 2 +- .../adapters/iolink/IfmAlMqttAdapter.java | 6 +- .../machine/MachineDataSimulator.java | 12 +-- .../protocol/stream/FileReplayAdapter.java | 5 +- .../complex/TopologyValidationProcessor.java | 4 +- .../model/monitoring/SpLogEntry.java | 16 ++-- ...amPipesStatistics.java => SpLogLevel.java} | 6 +- .../SpLogMessage.java} | 79 +++++++++++++------ .../pipeline/ExtensionsLogProvider.java | 6 +- .../streampipes/ps/DataLakeResourceV4.java | 4 +- .../connect/AdapterWorkerResource.java | 10 +-- .../monitoring/MonitoringResource.java | 2 +- .../rest/impl/connect/AdapterResource.java | 6 +- .../rest/impl/connect/GuessResource.java | 8 +- .../connect/RuntimeResolvableResource.java | 8 +- .../rest/impl/pe/DataProcessorResource.java | 4 +- .../rest/impl/pe/DataSinkResource.java | 4 +- .../rest/impl/pe/DataStreamResource.java | 4 +- .../standalone/function/FunctionContext.java | 10 ++- .../function/StreamPipesFunction.java | 4 +- .../routing/StandaloneSpOutputCollector.java | 8 +- .../StandaloneEventProcessorRuntime.java | 4 +- .../runtime/StandaloneEventSinkRuntime.java | 4 +- .../StandalonePipelineElementRuntime.java | 20 ++--- .../SpEventProcessorRuntimeContext.java | 19 ++--- .../context/SpEventSinkRuntimeContext.java | 14 ++-- .../wrapper/context/SpRuntimeContext.java | 34 ++++---- .../DataProcessorContextGenerator.java | 4 +- .../generator/DataSinkContextGenerator.java | 4 +- .../src/lib/model/gen/streampipes-model.ts | 55 ++++++------- .../basic-nav-tabs.component.html | 7 +- .../exception-details-dialog.component.ts | 4 +- .../sp-exception-message.component.ts | 4 +- .../error-message/error-message.component.ts | 4 +- .../event-schema/event-schema.component.ts | 6 +- .../existing-adapters.component.ts | 7 +- .../base-runtime-resolvable-input.ts | 7 +- ...ata-explorer-dashboard-widget.component.ts | 4 +- .../base-data-explorer-widget.directive.ts | 6 +- .../widgets/base/data-explorer-widget-data.ts | 4 +- 51 files changed, 369 insertions(+), 269 deletions(-) rename streampipes-model/src/main/java/org/apache/streampipes/model/monitoring/StreamPipesRuntimeError.java => streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/monitoring/FixedSizeList.java (52%) rename streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/monitoring/{IPipelineElementLogger.java => IExtensionsLogger.java} (72%) create mode 100644 streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/monitoring/ExtensionsLogger.java rename streampipes-model/src/main/java/org/apache/streampipes/model/monitoring/{StreamPipesStatistics.java => SpLogLevel.java} (94%) rename streampipes-model/src/main/java/org/apache/streampipes/model/{StreamPipesErrorMessage.java => monitoring/SpLogMessage.java} (59%) diff --git a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/connect/context/IAdapterRuntimeContext.java b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/connect/context/IAdapterRuntimeContext.java index 19f14b4d96..6b9035b40a 100644 --- a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/connect/context/IAdapterRuntimeContext.java +++ b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/connect/context/IAdapterRuntimeContext.java @@ -18,10 +18,10 @@ package org.apache.streampipes.extensions.api.connect.context; -import org.apache.streampipes.extensions.api.monitoring.SpMonitoringManager; +import org.apache.streampipes.extensions.api.monitoring.IExtensionsLogger; public interface IAdapterRuntimeContext extends IAdapterGuessSchemaContext { - SpMonitoringManager getLogger(); + IExtensionsLogger getLogger(); } diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/monitoring/StreamPipesRuntimeError.java b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/monitoring/FixedSizeList.java similarity index 52% rename from streampipes-model/src/main/java/org/apache/streampipes/model/monitoring/StreamPipesRuntimeError.java rename to streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/monitoring/FixedSizeList.java index eccf51de91..75f0a9f730 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/monitoring/StreamPipesRuntimeError.java +++ b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/monitoring/FixedSizeList.java @@ -16,36 +16,47 @@ * */ -package org.apache.streampipes.model.monitoring; +package org.apache.streampipes.extensions.api.monitoring; -public class StreamPipesRuntimeError { +import java.util.ArrayList; +import java.util.List; - private long timestamp; - private String title; - private String message; +public class FixedSizeList { + private final int maxSize; + private final List list; - private String stackTrace; + public FixedSizeList(int maxSize) { + if (maxSize <= 0) { + throw new IllegalArgumentException("Max size must be greater than zero."); + } + this.maxSize = maxSize; + this.list = new ArrayList<>(); + } - public StreamPipesRuntimeError(long timestamp, String title, String message, String stackTrace) { - this.timestamp = timestamp; - this.title = title; - this.message = message; - this.stackTrace = stackTrace; + public void add(T element) { + list.add(0, element); + if (list.size() > maxSize) { + list.remove(list.size() - 1); + } } - public long getTimestamp() { - return timestamp; + public T get(int index) { + return list.get(index); } - public String getTitle() { - return title; + public int size() { + return list.size(); } - public String getMessage() { - return message; + public List getAllItems() { + return this.list; } - public String getStackTrace() { - return stackTrace; + public void clear() { + this.list.clear(); } } + + + + diff --git a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/monitoring/IPipelineElementLogger.java b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/monitoring/IExtensionsLogger.java similarity index 72% rename from streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/monitoring/IPipelineElementLogger.java rename to streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/monitoring/IExtensionsLogger.java index e9418c41d0..adade12518 100644 --- a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/monitoring/IPipelineElementLogger.java +++ b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/monitoring/IExtensionsLogger.java @@ -18,7 +18,20 @@ package org.apache.streampipes.extensions.api.monitoring; -import org.slf4j.Logger; +import org.apache.streampipes.model.monitoring.SpLogMessage; -public interface IPipelineElementLogger extends Logger { +public interface IExtensionsLogger { + + void log(SpLogMessage logMessage); + + void error(Exception e); + + void error(String details, + Exception e); + + void info(String title, + String details); + + void warn(String title, + String details); } diff --git a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/monitoring/SpMonitoringManager.java b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/monitoring/SpMonitoringManager.java index 8c15d8e8ad..5f5e6fa6f5 100644 --- a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/monitoring/SpMonitoringManager.java +++ b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/monitoring/SpMonitoringManager.java @@ -22,7 +22,6 @@ import org.apache.streampipes.model.monitoring.SpLogEntry; import org.apache.streampipes.model.monitoring.SpMetricsEntry; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -31,7 +30,7 @@ public enum SpMonitoringManager { INSTANCE; - private final Map> logInfos; + private final Map> logInfos; private final Map metricsInfos; SpMonitoringManager() { @@ -42,9 +41,9 @@ public enum SpMonitoringManager { public void addErrorMessage(String resourceId, SpLogEntry errorMessageEntry) { if (!logInfos.containsKey(resourceId)) { - logInfos.put(resourceId, new ArrayList<>()); + logInfos.put(resourceId, new FixedSizeList<>(100)); } - this.logInfos.get(resourceId).add(0, errorMessageEntry); + this.logInfos.get(resourceId).add(errorMessageEntry); } public void increaseInCounter(String resourceId, @@ -87,18 +86,28 @@ public SpMetricsEntry getMetricsEntry(String resourceId, return currentEntry; } - public Map> getAllLogs() { - return logInfos; + public SpEndpointMonitoringInfo getMonitoringInfo() { + var logInfos = makeLogInfos(); + return new SpEndpointMonitoringInfo(logInfos, metricsInfos); } - public Map getAllMetrics() { - return this.metricsInfos; + public void clearAllLogs() { + this.logInfos.forEach((key, value) -> value.clear()); } - public SpEndpointMonitoringInfo getMonitoringInfo() { - return new SpEndpointMonitoringInfo(logInfos, metricsInfos); + private Map> makeLogInfos() { + var logEntries = new HashMap>(); + this.logInfos.forEach((key, value) -> + logEntries.put(key, cloneList(value.getAllItems()))); + + return logEntries; } + private List cloneList(List allItems) { + return allItems.stream().map(SpLogEntry::new).toList(); + } + + private void checkAndPrepareMetrics(String resourceId) { if (!metricsInfos.containsKey(resourceId)) { addMetricsObject(resourceId); @@ -109,8 +118,4 @@ private void addMetricsObject(String resourceId) { this.metricsInfos.put(resourceId, new SpMetricsEntry()); } - - public void clearAllLogs() { - logInfos.forEach((key, value) -> value.clear()); - } } diff --git a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/pe/context/RuntimeContext.java b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/pe/context/RuntimeContext.java index 5c6e1ae716..390d503188 100644 --- a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/pe/context/RuntimeContext.java +++ b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/pe/context/RuntimeContext.java @@ -19,11 +19,11 @@ import org.apache.streampipes.client.api.IStreamPipesClient; import org.apache.streampipes.extensions.api.config.IConfigExtractor; -import org.apache.streampipes.extensions.api.monitoring.SpMonitoringManager; +import org.apache.streampipes.extensions.api.monitoring.IExtensionsLogger; public interface RuntimeContext { - SpMonitoringManager getLogger(); + IExtensionsLogger getLogger(); String getCorrespondingUser(); diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/AdapterWorkerManagement.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/AdapterWorkerManagement.java index 65a7c8ecf8..958d413f1f 100644 --- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/AdapterWorkerManagement.java +++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/AdapterWorkerManagement.java @@ -23,6 +23,7 @@ import org.apache.streampipes.extensions.api.connect.context.IAdapterRuntimeContext; import org.apache.streampipes.extensions.api.monitoring.SpMonitoringManager; import org.apache.streampipes.extensions.management.connect.adapter.model.EventCollector; +import org.apache.streampipes.extensions.management.context.AdapterContextGenerator; import org.apache.streampipes.extensions.management.init.IDeclarersSingleton; import org.apache.streampipes.extensions.management.init.RunningAdapterInstances; import org.apache.streampipes.model.connect.adapter.AdapterDescription; @@ -41,14 +42,10 @@ public class AdapterWorkerManagement { private final RunningAdapterInstances runningAdapterInstances; private final IDeclarersSingleton declarers; - private final IAdapterRuntimeContext adapterRuntimeContext; - public AdapterWorkerManagement(RunningAdapterInstances runningAdapterInstances, - IDeclarersSingleton declarers, - IAdapterRuntimeContext runtimeContext) { + IDeclarersSingleton declarers) { this.runningAdapterInstances = runningAdapterInstances; this.declarers = declarers; - this.adapterRuntimeContext = runtimeContext; } public Collection getAllRunningAdapterInstances() { @@ -69,8 +66,9 @@ public void invokeAdapter(AdapterDescription adapterDescription) throws AdapterE var registeredParsers = newAdapterInstance.declareConfig().getSupportedParsers(); var extractor = AdapterParameterExtractor.from(adapterDescription, registeredParsers); var eventCollector = EventCollector.from(adapterDescription); + var runtimeContext = makeRuntimeContext(adapterDescription.getElementId()); - newAdapterInstance.onAdapterStarted(extractor, eventCollector, adapterRuntimeContext); + newAdapterInstance.onAdapterStarted(extractor, eventCollector, runtimeContext); } else { var errorMessage = "Adapter with id %s could not be found".formatted(adapterDescription.getAppId()); LOG.error(errorMessage); @@ -88,12 +86,17 @@ public void stopAdapter(AdapterDescription adapterDescription) throws AdapterExc var registeredParsers = adapter.declareConfig().getSupportedParsers(); var extractor = AdapterParameterExtractor.from(adapterDescription, registeredParsers); - adapter.onAdapterStopped(extractor, adapterRuntimeContext); + var runtimeContext = makeRuntimeContext(elementId); + adapter.onAdapterStopped(extractor, runtimeContext); } resetMonitoring(elementId); } + private IAdapterRuntimeContext makeRuntimeContext(String adapterInstanceId) { + return new AdapterContextGenerator().makeRuntimeContext(adapterInstanceId); + } + private void resetMonitoring(String elementId) { SpMonitoringManager.INSTANCE.reset(elementId); } diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/PullAdapterScheduler.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/PullAdapterScheduler.java index eeb1aaf62d..de9a1b4146 100644 --- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/PullAdapterScheduler.java +++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/PullAdapterScheduler.java @@ -20,8 +20,8 @@ import org.apache.streampipes.extensions.api.connect.IPullAdapter; import org.apache.streampipes.extensions.api.monitoring.SpMonitoringManager; -import org.apache.streampipes.model.StreamPipesErrorMessage; import org.apache.streampipes.model.monitoring.SpLogEntry; +import org.apache.streampipes.model.monitoring.SpLogMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,7 +45,7 @@ public void schedule(IPullAdapter pullAdapter, } catch (ExecutionException | InterruptedException e) { SpMonitoringManager.INSTANCE.addErrorMessage( adapterElementId, - SpLogEntry.from(System.currentTimeMillis(), StreamPipesErrorMessage.from(e))); + SpLogEntry.from(System.currentTimeMillis(), SpLogMessage.from(e))); } catch (TimeoutException e) { LOGGER.warn("Timeout occurred", e); } diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToBrokerAdapterSink.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToBrokerAdapterSink.java index 52f864db76..b9cebe2581 100644 --- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToBrokerAdapterSink.java +++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToBrokerAdapterSink.java @@ -24,12 +24,11 @@ import org.apache.streampipes.extensions.api.connect.IAdapterPipelineElement; import org.apache.streampipes.extensions.api.monitoring.SpMonitoringManager; import org.apache.streampipes.extensions.management.connect.adapter.util.TransportFormatSelector; +import org.apache.streampipes.extensions.management.monitoring.ExtensionsLogger; import org.apache.streampipes.messaging.EventProducer; -import org.apache.streampipes.model.StreamPipesErrorMessage; import org.apache.streampipes.model.connect.adapter.AdapterDescription; import org.apache.streampipes.model.grounding.TransportFormat; import org.apache.streampipes.model.grounding.TransportProtocol; -import org.apache.streampipes.model.monitoring.SpLogEntry; import java.util.Map; @@ -78,9 +77,7 @@ public Map process(Map event) { System.currentTimeMillis()); } } catch (RuntimeException e) { - SpMonitoringManager.INSTANCE.addErrorMessage( - adapterDescription.getElementId(), - SpLogEntry.from(System.currentTimeMillis(), StreamPipesErrorMessage.from(e))); + new ExtensionsLogger(adapterDescription.getElementId()).error(e); } return null; } diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/context/AdapterContextGenerator.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/context/AdapterContextGenerator.java index 84a3aa3468..77e24c234a 100644 --- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/context/AdapterContextGenerator.java +++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/context/AdapterContextGenerator.java @@ -20,15 +20,18 @@ import org.apache.streampipes.extensions.api.connect.context.IAdapterGuessSchemaContext; import org.apache.streampipes.extensions.api.connect.context.IAdapterRuntimeContext; -import org.apache.streampipes.extensions.api.monitoring.SpMonitoringManager; +import org.apache.streampipes.extensions.management.monitoring.ExtensionsLogger; import static org.apache.streampipes.extensions.management.util.RuntimeContextUtils.makeConfigExtractor; import static org.apache.streampipes.extensions.management.util.RuntimeContextUtils.makeStreamPipesClient; public class AdapterContextGenerator { - public IAdapterRuntimeContext makeRuntimeContext() { - return new SpAdapterRuntimeContext(SpMonitoringManager.INSTANCE, makeConfigExtractor(), makeStreamPipesClient()); + public IAdapterRuntimeContext makeRuntimeContext(String adapterInstanceId) { + return new SpAdapterRuntimeContext( + new ExtensionsLogger(adapterInstanceId), + makeConfigExtractor(), + makeStreamPipesClient()); } public IAdapterGuessSchemaContext makeGuessSchemaContext() { diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/context/SpAdapterRuntimeContext.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/context/SpAdapterRuntimeContext.java index f0dd95b759..2fc6fd95e6 100644 --- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/context/SpAdapterRuntimeContext.java +++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/context/SpAdapterRuntimeContext.java @@ -20,7 +20,7 @@ import org.apache.streampipes.client.StreamPipesClient; import org.apache.streampipes.extensions.api.connect.context.IAdapterRuntimeContext; -import org.apache.streampipes.extensions.api.monitoring.SpMonitoringManager; +import org.apache.streampipes.extensions.api.monitoring.IExtensionsLogger; import org.apache.streampipes.extensions.management.config.ConfigExtractor; import java.io.Serializable; @@ -28,17 +28,17 @@ public class SpAdapterRuntimeContext extends SpAdapterGuessSchemaContext implements IAdapterRuntimeContext, Serializable { - private SpMonitoringManager monitoringManager; + private final IExtensionsLogger extensionsLogger; - public SpAdapterRuntimeContext(SpMonitoringManager monitoringManager, + public SpAdapterRuntimeContext(IExtensionsLogger extensionsLogger, ConfigExtractor configExtractor, StreamPipesClient streamPipesClient) { super(configExtractor, streamPipesClient); - this.monitoringManager = monitoringManager; + this.extensionsLogger = extensionsLogger; } @Override - public SpMonitoringManager getLogger() { - return monitoringManager; + public IExtensionsLogger getLogger() { + return extensionsLogger; } } diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/monitoring/ExtensionsLogger.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/monitoring/ExtensionsLogger.java new file mode 100644 index 0000000000..bfe9b56be8 --- /dev/null +++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/monitoring/ExtensionsLogger.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.extensions.management.monitoring; + +import org.apache.streampipes.extensions.api.monitoring.IExtensionsLogger; +import org.apache.streampipes.extensions.api.monitoring.SpMonitoringManager; +import org.apache.streampipes.model.monitoring.SpLogEntry; +import org.apache.streampipes.model.monitoring.SpLogMessage; + +public class ExtensionsLogger implements IExtensionsLogger { + + private final String resourceId; + private final SpMonitoringManager monitoringManager; + + public ExtensionsLogger(String resourceId) { + this.resourceId = resourceId; + this.monitoringManager = SpMonitoringManager.INSTANCE; + } + + @Override + public void log(SpLogMessage logMessage) { + monitoringManager.addErrorMessage( + resourceId, + SpLogEntry.from(System.currentTimeMillis(), logMessage)); + } + + @Override + public void error(Exception e) { + log(SpLogMessage.from(e)); + } + + @Override + public void error(String details, Exception e) { + log(SpLogMessage.from(e, details)); + } + + @Override + public void info(String title, String details) { + log(SpLogMessage.info(title, details)); + } + + @Override + public void warn(String title, String details) { + log(SpLogMessage.warn(title, details)); + } + +} diff --git a/streampipes-extensions-management/src/test/java/org/apache/streampipes/extensions/management/connect/AdapterWorkerManagementTest.java b/streampipes-extensions-management/src/test/java/org/apache/streampipes/extensions/management/connect/AdapterWorkerManagementTest.java index de8645795a..4df695db52 100644 --- a/streampipes-extensions-management/src/test/java/org/apache/streampipes/extensions/management/connect/AdapterWorkerManagementTest.java +++ b/streampipes-extensions-management/src/test/java/org/apache/streampipes/extensions/management/connect/AdapterWorkerManagementTest.java @@ -42,7 +42,7 @@ public void invokeAdapterNotPresent() throws AdapterException { when(declarerSingleton.getAdapter(any())).thenReturn(Optional.empty()); var adapterWorkerManagement = new AdapterWorkerManagement( - null, declarerSingleton, null); + null, declarerSingleton); adapterWorkerManagement.invokeAdapter(adapterDescription); } diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/iolink/IfmAlMqttAdapter.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/iolink/IfmAlMqttAdapter.java index c5db51d946..e2606996d9 100644 --- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/iolink/IfmAlMqttAdapter.java +++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/iolink/IfmAlMqttAdapter.java @@ -32,9 +32,7 @@ import org.apache.streampipes.extensions.management.connect.adapter.parser.JsonParsers; import org.apache.streampipes.extensions.management.connect.adapter.parser.json.JsonObjectParser; import org.apache.streampipes.model.AdapterType; -import org.apache.streampipes.model.StreamPipesErrorMessage; import org.apache.streampipes.model.connect.guess.GuessSchema; -import org.apache.streampipes.model.monitoring.SpLogEntry; import org.apache.streampipes.pe.shared.config.mqtt.MqttConfig; import org.apache.streampipes.pe.shared.config.mqtt.MqttConnectUtils; import org.apache.streampipes.pe.shared.config.mqtt.MqttConsumer; @@ -127,9 +125,7 @@ public void onAdapterStarted(IAdapterParameterExtractor extractor, } catch (Exception e) { adapterRuntimeContext .getLogger() - .addErrorMessage( - extractor.getAdapterDescription().getElementId(), - SpLogEntry.from(System.currentTimeMillis(), StreamPipesErrorMessage.from(e))); + .error(e); LOG.error("Could not parse event", e); } }); diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/simulator/machine/MachineDataSimulator.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/simulator/machine/MachineDataSimulator.java index 020dfb3db8..32b941a21c 100644 --- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/simulator/machine/MachineDataSimulator.java +++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/simulator/machine/MachineDataSimulator.java @@ -19,7 +19,6 @@ import org.apache.streampipes.commons.exceptions.connect.AdapterException; import org.apache.streampipes.extensions.api.connect.IEventCollector; -import org.apache.streampipes.extensions.management.connect.adapter.model.pipeline.AdapterPipeline; import java.util.HashMap; import java.util.Map; @@ -33,20 +32,15 @@ public class MachineDataSimulator implements Runnable { private Boolean running; - public MachineDataSimulator(IEventCollector collector, Integer waitTimeMs, String selectedSimulatorOption) { + public MachineDataSimulator(IEventCollector collector, + Integer waitTimeMs, + String selectedSimulatorOption) { this.collector = collector; this.waitTimeMs = waitTimeMs; this.selectedSimulatorOption = selectedSimulatorOption; this.running = true; } - @Deprecated - public MachineDataSimulator(AdapterPipeline adapterPipeline, Integer waitTimeMs, String selectedSimulatorOption) { - this.waitTimeMs = waitTimeMs; - this.selectedSimulatorOption = selectedSimulatorOption; - this.running = true; - } - @Override public void run() { this.running = true; diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileReplayAdapter.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileReplayAdapter.java index c58254cbe8..f9ad62a2c9 100644 --- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileReplayAdapter.java +++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileReplayAdapter.java @@ -31,9 +31,7 @@ import org.apache.streampipes.extensions.management.connect.adapter.parser.JsonParsers; import org.apache.streampipes.extensions.management.connect.adapter.parser.xml.XmlParser; import org.apache.streampipes.model.AdapterType; -import org.apache.streampipes.model.StreamPipesErrorMessage; import org.apache.streampipes.model.connect.guess.GuessSchema; -import org.apache.streampipes.model.monitoring.SpLogEntry; import org.apache.streampipes.sdk.StaticProperties; import org.apache.streampipes.sdk.builder.adapter.AdapterConfigurationBuilder; import org.apache.streampipes.sdk.helpers.Alternatives; @@ -199,8 +197,7 @@ private void parseFile(IAdapterParameterExtractor extractor, } catch (AdapterException e) { adapterRuntimeContext .getLogger() - .addErrorMessage(extractor.getAdapterDescription().getElementId(), - SpLogEntry.from(System.currentTimeMillis(), StreamPipesErrorMessage.from(e))); + .error(e); } } diff --git a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/validation/complex/TopologyValidationProcessor.java b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/validation/complex/TopologyValidationProcessor.java index f0910c893c..03a2b33828 100644 --- a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/validation/complex/TopologyValidationProcessor.java +++ b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/validation/complex/TopologyValidationProcessor.java @@ -23,9 +23,9 @@ import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext; import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector; import org.apache.streampipes.model.DataProcessorType; -import org.apache.streampipes.model.StreamPipesErrorMessage; import org.apache.streampipes.model.graph.DataProcessorDescription; import org.apache.streampipes.model.monitoring.SpLogEntry; +import org.apache.streampipes.model.monitoring.SpLogMessage; import org.apache.streampipes.model.runtime.Event; import org.apache.streampipes.model.schema.PropertyScope; import org.apache.streampipes.processors.geo.jvm.jts.exceptions.SpJtsGeoemtryException; @@ -120,7 +120,7 @@ public void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeEx if (isLogOutput) { SpMonitoringManager.INSTANCE.addErrorMessage(params.getGraph().getElementId(), SpLogEntry.from(System.currentTimeMillis(), - StreamPipesErrorMessage.from(new SpJtsGeoemtryException( + SpLogMessage.from(new SpJtsGeoemtryException( validator.getValidationError().toString())))); } } diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/monitoring/SpLogEntry.java b/streampipes-model/src/main/java/org/apache/streampipes/model/monitoring/SpLogEntry.java index 75fb385f6c..959289ef3f 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/monitoring/SpLogEntry.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/monitoring/SpLogEntry.java @@ -18,27 +18,31 @@ package org.apache.streampipes.model.monitoring; -import org.apache.streampipes.model.StreamPipesErrorMessage; import org.apache.streampipes.model.shared.annotation.TsModel; @TsModel public class SpLogEntry { private long timestamp; - private StreamPipesErrorMessage errorMessage; + private SpLogMessage errorMessage; public SpLogEntry() { } + public SpLogEntry(SpLogEntry other) { + this.timestamp = other.getTimestamp(); + this.errorMessage = new SpLogMessage(other.getErrorMessage()); + } + private SpLogEntry(long timestamp, - StreamPipesErrorMessage errorMessage) { + SpLogMessage errorMessage) { this.timestamp = timestamp; this.errorMessage = errorMessage; } public static SpLogEntry from(long timestamp, - StreamPipesErrorMessage errorMessage) { + SpLogMessage errorMessage) { return new SpLogEntry(timestamp, errorMessage); } @@ -50,11 +54,11 @@ public void setTimestamp(long timestamp) { this.timestamp = timestamp; } - public StreamPipesErrorMessage getErrorMessage() { + public SpLogMessage getErrorMessage() { return errorMessage; } - public void setErrorMessage(StreamPipesErrorMessage errorMessage) { + public void setErrorMessage(SpLogMessage errorMessage) { this.errorMessage = errorMessage; } } diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/monitoring/StreamPipesStatistics.java b/streampipes-model/src/main/java/org/apache/streampipes/model/monitoring/SpLogLevel.java similarity index 94% rename from streampipes-model/src/main/java/org/apache/streampipes/model/monitoring/StreamPipesStatistics.java rename to streampipes-model/src/main/java/org/apache/streampipes/model/monitoring/SpLogLevel.java index f67f91d6e3..ece9018a3a 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/monitoring/StreamPipesStatistics.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/monitoring/SpLogLevel.java @@ -18,5 +18,9 @@ package org.apache.streampipes.model.monitoring; -public class StreamPipesStatistics { +public enum SpLogLevel { + + INFO, + WARN, + ERROR } diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/StreamPipesErrorMessage.java b/streampipes-model/src/main/java/org/apache/streampipes/model/monitoring/SpLogMessage.java similarity index 59% rename from streampipes-model/src/main/java/org/apache/streampipes/model/StreamPipesErrorMessage.java rename to streampipes-model/src/main/java/org/apache/streampipes/model/monitoring/SpLogMessage.java index a52ee29e3b..e1123a26cc 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/StreamPipesErrorMessage.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/monitoring/SpLogMessage.java @@ -16,39 +16,80 @@ * */ -package org.apache.streampipes.model; +package org.apache.streampipes.model.monitoring; import org.apache.streampipes.model.shared.annotation.TsModel; import org.apache.commons.lang3.exception.ExceptionUtils; @TsModel -public class StreamPipesErrorMessage { +public class SpLogMessage { - private String level; + private SpLogLevel level; private String title; private String detail; private String cause; private String fullStackTrace; - public StreamPipesErrorMessage() { + public static SpLogMessage from(Exception exception) { + return from(exception, ""); + } + + public static SpLogMessage from(Exception exception, + String detail) { + String cause = exception.getCause() != null ? exception.getCause().getMessage() : exception.getMessage(); + return new SpLogMessage( + SpLogLevel.ERROR, + exception.getMessage(), + detail, + ExceptionUtils.getStackTrace(exception), + cause); + } + + public static SpLogMessage info(String title, + String details) { + return new SpLogMessage( + SpLogLevel.INFO, + title, + details + ); + } + + public static SpLogMessage warn(String title, + String details) { + return new SpLogMessage( + SpLogLevel.WARN, + title, + details + ); + } + + public SpLogMessage() { } - public StreamPipesErrorMessage(String level, - String title, - String detail) { + public SpLogMessage(SpLogMessage other) { + this.level = other.getLevel(); + this.detail = other.getDetail(); + this.title = other.getTitle(); + this.cause = other.getCause(); + this.fullStackTrace = other.getFullStackTrace(); + } + + public SpLogMessage(SpLogLevel level, + String title, + String detail) { this.level = level; this.title = title; this.detail = detail; } - public StreamPipesErrorMessage(String level, - String title, - String detail, - String fullStackTrace, - String cause) { + public SpLogMessage(SpLogLevel level, + String title, + String detail, + String fullStackTrace, + String cause) { this.level = level; this.title = title; this.detail = detail; @@ -56,21 +97,11 @@ public StreamPipesErrorMessage(String level, this.cause = cause; } - public static StreamPipesErrorMessage from(Exception exception) { - String cause = exception.getCause() != null ? exception.getCause().getMessage() : exception.getMessage(); - return new StreamPipesErrorMessage( - "error", - exception.getMessage(), - "", - ExceptionUtils.getStackTrace(exception), - cause); - } - - public String getLevel() { + public SpLogLevel getLevel() { return level; } - public void setLevel(String level) { + public void setLevel(SpLogLevel level) { this.level = level; } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/ExtensionsLogProvider.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/ExtensionsLogProvider.java index 013b55a6e2..4517e30b13 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/ExtensionsLogProvider.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/ExtensionsLogProvider.java @@ -53,7 +53,11 @@ public void addMonitoringInfos(SpEndpointMonitoringInfo monitoringInfo) { infos.addAll(0, value); if (infos.size() > MAX_ITEMS) { - infos.subList(MAX_ITEMS, infos.size()).clear(); + int numElementsToRemove = infos.size() - MAX_ITEMS; + + for (int i = 0; i < numElementsToRemove; i++) { + infos.remove(infos.size() - 1); + } } }); } diff --git a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java index b449e2dcf5..a0d798c370 100644 --- a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java +++ b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java @@ -22,10 +22,10 @@ import org.apache.streampipes.dataexplorer.DataExplorerSchemaManagement; import org.apache.streampipes.dataexplorer.param.ProvidedRestQueryParams; import org.apache.streampipes.dataexplorer.query.writer.OutputFormat; -import org.apache.streampipes.model.StreamPipesErrorMessage; import org.apache.streampipes.model.datalake.DataLakeMeasure; import org.apache.streampipes.model.datalake.DataSeries; import org.apache.streampipes.model.datalake.SpQueryResult; +import org.apache.streampipes.model.monitoring.SpLogMessage; import org.apache.streampipes.rest.core.base.impl.AbstractRestResource; import io.swagger.v3.oas.annotations.Operation; @@ -241,7 +241,7 @@ public Response getData( this.dataLakeManagement.getData(sanitizedParams, isIgnoreMissingValues(missingValueBehaviour)); return ok(result); } catch (RuntimeException e) { - return badRequest(StreamPipesErrorMessage.from(e)); + return badRequest(SpLogMessage.from(e)); } } } diff --git a/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/connect/AdapterWorkerResource.java b/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/connect/AdapterWorkerResource.java index d25ea63844..2877fc456e 100644 --- a/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/connect/AdapterWorkerResource.java +++ b/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/connect/AdapterWorkerResource.java @@ -20,12 +20,11 @@ import org.apache.streampipes.commons.exceptions.connect.AdapterException; import org.apache.streampipes.extensions.management.connect.AdapterWorkerManagement; -import org.apache.streampipes.extensions.management.context.AdapterContextGenerator; import org.apache.streampipes.extensions.management.init.DeclarersSingleton; import org.apache.streampipes.extensions.management.init.RunningAdapterInstances; -import org.apache.streampipes.model.StreamPipesErrorMessage; import org.apache.streampipes.model.connect.adapter.AdapterDescription; import org.apache.streampipes.model.message.Notifications; +import org.apache.streampipes.model.monitoring.SpLogMessage; import org.apache.streampipes.rest.shared.annotation.JacksonSerialized; import org.apache.streampipes.rest.shared.impl.AbstractSharedRestInterface; @@ -51,8 +50,7 @@ public class AdapterWorkerResource extends AbstractSharedRestInterface { public AdapterWorkerResource() { adapterManagement = new AdapterWorkerManagement( RunningAdapterInstances.INSTANCE, - DeclarersSingleton.getInstance(), - new AdapterContextGenerator().makeRuntimeContext() + DeclarersSingleton.getInstance() ); } @@ -83,7 +81,7 @@ public Response invokeAdapter(AdapterDescription adapterStreamDescription) { return ok(Notifications.success(responseMessage)); } catch (AdapterException e) { logger.error("Error while starting adapter with id " + adapterStreamDescription.getUri(), e); - return serverError(StreamPipesErrorMessage.from(e)); + return serverError(SpLogMessage.from(e)); } } @@ -107,7 +105,7 @@ public Response stopAdapter(AdapterDescription adapterStreamDescription) { return ok(Notifications.success(responseMessage)); } catch (AdapterException e) { logger.error("Error while stopping adapter with id " + adapterStreamDescription.getElementId(), e); - return serverError(StreamPipesErrorMessage.from(e)); + return serverError(SpLogMessage.from(e)); } } diff --git a/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/monitoring/MonitoringResource.java b/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/monitoring/MonitoringResource.java index 0c2ee0aa47..8cfbea34d4 100644 --- a/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/monitoring/MonitoringResource.java +++ b/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/monitoring/MonitoringResource.java @@ -37,7 +37,7 @@ public Response getMonitoringInfos() { try { return ok(SpMonitoringManager.INSTANCE.getMonitoringInfo()); } finally { - //SpLogManager.INSTANCE.clearAllLogs(); + SpMonitoringManager.INSTANCE.clearAllLogs(); } } } diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java index 775e5c8858..7de6ed08c6 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java @@ -20,9 +20,9 @@ import org.apache.streampipes.commons.exceptions.connect.AdapterException; import org.apache.streampipes.connect.management.management.AdapterMasterManagement; -import org.apache.streampipes.model.StreamPipesErrorMessage; import org.apache.streampipes.model.connect.adapter.AdapterDescription; import org.apache.streampipes.model.message.Notifications; +import org.apache.streampipes.model.monitoring.SpLogMessage; import org.apache.streampipes.rest.security.AuthConstants; import org.apache.streampipes.rest.shared.annotation.JacksonSerialized; import org.apache.streampipes.storage.management.StorageDispatcher; @@ -121,7 +121,7 @@ public Response stopAdapter(@PathParam("id") String adapterId) { return ok(Notifications.success("Adapter started")); } catch (AdapterException e) { LOG.error("Could not stop adapter with id " + adapterId, e); - return serverError(StreamPipesErrorMessage.from(e)); + return serverError(SpLogMessage.from(e)); } } @@ -136,7 +136,7 @@ public Response startAdapter(@PathParam("id") String adapterId) { return ok(Notifications.success("Adapter stopped")); } catch (AdapterException e) { LOG.error("Could not start adapter with id " + adapterId, e); - return serverError(StreamPipesErrorMessage.from(e)); + return serverError(SpLogMessage.from(e)); } } diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/GuessResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/GuessResource.java index ea00214c1c..50623e231d 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/GuessResource.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/GuessResource.java @@ -22,10 +22,10 @@ import org.apache.streampipes.commons.exceptions.connect.ParseException; import org.apache.streampipes.connect.management.management.GuessManagement; import org.apache.streampipes.extensions.api.connect.exception.WorkerAdapterException; -import org.apache.streampipes.model.StreamPipesErrorMessage; import org.apache.streampipes.model.connect.adapter.AdapterDescription; import org.apache.streampipes.model.connect.guess.AdapterEventPreview; import org.apache.streampipes.model.connect.guess.GuessSchema; +import org.apache.streampipes.model.monitoring.SpLogMessage; import org.apache.streampipes.rest.shared.annotation.JacksonSerialized; import com.fasterxml.jackson.core.JsonProcessingException; @@ -63,12 +63,12 @@ public Response guessSchema(AdapterDescription adapterDescription) { return ok(result); } catch (ParseException e) { LOG.error("Error while parsing events: ", e); - return badRequest(StreamPipesErrorMessage.from(e)); + return badRequest(SpLogMessage.from(e)); } catch (WorkerAdapterException e) { - return serverError(StreamPipesErrorMessage.from(e)); + return serverError(SpLogMessage.from(e)); } catch (NoServiceEndpointsAvailableException | IOException e) { LOG.error(e.getMessage()); - return serverError(StreamPipesErrorMessage.from(e)); + return serverError(SpLogMessage.from(e)); } } diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java index e22a4d4fa2..c8e5386b34 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java @@ -24,7 +24,7 @@ import org.apache.streampipes.connect.management.management.WorkerAdministrationManagement; import org.apache.streampipes.connect.management.management.WorkerRestClient; import org.apache.streampipes.connect.management.management.WorkerUrlProvider; -import org.apache.streampipes.model.StreamPipesErrorMessage; +import org.apache.streampipes.model.monitoring.SpLogMessage; import org.apache.streampipes.model.runtime.RuntimeOptionsRequest; import org.apache.streampipes.model.runtime.RuntimeOptionsResponse; import org.apache.streampipes.rest.shared.annotation.JacksonSerialized; @@ -68,13 +68,13 @@ public Response fetchConfigurations(@PathParam("id") String appId, return ok(result); } catch (AdapterException e) { LOG.error("Adapter exception occurred", e); - return serverError(StreamPipesErrorMessage.from(e)); + return serverError(SpLogMessage.from(e)); } catch (NoServiceEndpointsAvailableException e) { LOG.error("Could not find service endpoint for {} while fetching configuration", appId); - return serverError(StreamPipesErrorMessage.from(e)); + return serverError(SpLogMessage.from(e)); } catch (SpConfigurationException e) { LOG.error("Tried to fetch a runtime configuration with insufficient settings"); - return badRequest(StreamPipesErrorMessage.from(e)); + return badRequest(SpLogMessage.from(e)); } } diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/pe/DataProcessorResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/pe/DataProcessorResource.java index edbdfa6428..6f52faeadd 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/pe/DataProcessorResource.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/pe/DataProcessorResource.java @@ -18,10 +18,10 @@ package org.apache.streampipes.rest.impl.pe; -import org.apache.streampipes.model.StreamPipesErrorMessage; import org.apache.streampipes.model.graph.DataProcessorDescription; import org.apache.streampipes.model.graph.DataProcessorInvocation; import org.apache.streampipes.model.message.NotificationType; +import org.apache.streampipes.model.monitoring.SpLogMessage; import org.apache.streampipes.resource.management.DataProcessorResourceManager; import org.apache.streampipes.rest.core.base.impl.AbstractAuthGuardedRestResource; import org.apache.streampipes.rest.security.AuthConstants; @@ -83,7 +83,7 @@ public Response getElement(@PathParam("elementId") String elementId) { try { return ok(getDataProcessorResourceManager().findAsInvocation(elementId)); } catch (IllegalArgumentException e) { - return notFound(StreamPipesErrorMessage.from(e)); + return notFound(SpLogMessage.from(e)); } } diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/pe/DataSinkResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/pe/DataSinkResource.java index 55c62f5ab7..884b6b7aae 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/pe/DataSinkResource.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/pe/DataSinkResource.java @@ -18,10 +18,10 @@ package org.apache.streampipes.rest.impl.pe; -import org.apache.streampipes.model.StreamPipesErrorMessage; import org.apache.streampipes.model.graph.DataSinkDescription; import org.apache.streampipes.model.graph.DataSinkInvocation; import org.apache.streampipes.model.message.NotificationType; +import org.apache.streampipes.model.monitoring.SpLogMessage; import org.apache.streampipes.resource.management.DataSinkResourceManager; import org.apache.streampipes.rest.core.base.impl.AbstractAuthGuardedRestResource; import org.apache.streampipes.rest.security.AuthConstants; @@ -84,7 +84,7 @@ public Response getElement(@PathParam("elementId") String elementId) { try { return ok(getDataSinkResourceManager().findAsInvocation(elementId)); } catch (IllegalArgumentException e) { - return notFound(StreamPipesErrorMessage.from(e)); + return notFound(SpLogMessage.from(e)); } } diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/pe/DataStreamResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/pe/DataStreamResource.java index 9727f7ba9f..5c582daced 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/pe/DataStreamResource.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/pe/DataStreamResource.java @@ -19,8 +19,8 @@ package org.apache.streampipes.rest.impl.pe; import org.apache.streampipes.model.SpDataStream; -import org.apache.streampipes.model.StreamPipesErrorMessage; import org.apache.streampipes.model.message.NotificationType; +import org.apache.streampipes.model.monitoring.SpLogMessage; import org.apache.streampipes.resource.management.DataStreamResourceManager; import org.apache.streampipes.rest.core.base.impl.AbstractAuthGuardedRestResource; import org.apache.streampipes.rest.security.AuthConstants; @@ -85,7 +85,7 @@ public Response getElement(@PathParam("elementId") String elementId) { try { return ok(getDataStreamResourceManager().findAsInvocation(elementId)); } catch (IllegalArgumentException e) { - return notFound(StreamPipesErrorMessage.from(e)); + return notFound(SpLogMessage.from(e)); } } diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/FunctionContext.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/FunctionContext.java index ed1f714fd4..ff4326971e 100644 --- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/FunctionContext.java +++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/FunctionContext.java @@ -19,11 +19,11 @@ package org.apache.streampipes.wrapper.standalone.function; import org.apache.streampipes.client.StreamPipesClient; -import org.apache.streampipes.extensions.api.monitoring.SpMonitoringManager; +import org.apache.streampipes.extensions.api.monitoring.IExtensionsLogger; import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector; import org.apache.streampipes.extensions.management.config.ConfigExtractor; +import org.apache.streampipes.extensions.management.monitoring.ExtensionsLogger; import org.apache.streampipes.model.SpDataStream; -import org.apache.streampipes.model.monitoring.SpLogEntry; import org.apache.streampipes.model.schema.EventSchema; import java.util.Collection; @@ -40,6 +40,7 @@ public class FunctionContext { private ConfigExtractor config; private Map outputCollectors; + private IExtensionsLogger extensionsLogger; public FunctionContext() { this.streams = new HashMap<>(); @@ -56,6 +57,7 @@ public FunctionContext(String functionId, this.functionId = functionId; this.outputCollectors = outputCollectors; this.client = client; + this.extensionsLogger = new ExtensionsLogger(functionId); } public Collection getStreams() { @@ -78,8 +80,8 @@ public String getFunctionId() { return functionId; } - public void log(SpLogEntry logEntry) { - SpMonitoringManager.INSTANCE.addErrorMessage(functionId, logEntry); + public IExtensionsLogger getLogger() { + return extensionsLogger; } public Map getOutputCollectors() { diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java index f18ca9f9e9..b4a09e20f9 100644 --- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java +++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java @@ -29,9 +29,9 @@ import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector; import org.apache.streampipes.extensions.management.util.GroundingDebugUtils; import org.apache.streampipes.model.SpDataStream; -import org.apache.streampipes.model.StreamPipesErrorMessage; import org.apache.streampipes.model.constants.PropertySelectorConstants; import org.apache.streampipes.model.monitoring.SpLogEntry; +import org.apache.streampipes.model.monitoring.SpLogMessage; import org.apache.streampipes.model.runtime.Event; import org.apache.streampipes.model.runtime.EventFactory; import org.apache.streampipes.model.runtime.SchemaInfo; @@ -136,7 +136,7 @@ private void addError(RuntimeException e) { var functionId = this.getFunctionConfig().getFunctionId(); SpMonitoringManager.INSTANCE.addErrorMessage( functionId.getId(), - SpLogEntry.from(System.currentTimeMillis(), StreamPipesErrorMessage.from(e))); + SpLogEntry.from(System.currentTimeMillis(), SpLogMessage.from(e))); } private void initializeProducers() { diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpOutputCollector.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpOutputCollector.java index c76df7cc6a..671de27820 100644 --- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpOutputCollector.java +++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpOutputCollector.java @@ -21,12 +21,11 @@ import org.apache.streampipes.commons.exceptions.SpRuntimeException; import org.apache.streampipes.extensions.api.monitoring.SpMonitoringManager; import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector; +import org.apache.streampipes.extensions.management.monitoring.ExtensionsLogger; import org.apache.streampipes.messaging.EventProducer; import org.apache.streampipes.messaging.InternalEventProcessor; -import org.apache.streampipes.model.StreamPipesErrorMessage; import org.apache.streampipes.model.grounding.TransportFormat; import org.apache.streampipes.model.grounding.TransportProtocol; -import org.apache.streampipes.model.monitoring.SpLogEntry; import org.apache.streampipes.model.runtime.Event; import org.apache.streampipes.model.runtime.EventConverter; import org.apache.streampipes.wrapper.standalone.manager.ProtocolManager; @@ -44,6 +43,7 @@ public class StandaloneSpOutputCollector extends private final EventProducer producer; private final String resourceId; + private final ExtensionsLogger extensionsLogger; public StandaloneSpOutputCollector(T protocol, TransportFormat format, @@ -51,6 +51,7 @@ public StandaloneSpOutputCollector(T protocol, super(protocol, format); this.producer = protocolDefinition.getProducer(protocol); this.resourceId = resourceId; + this.extensionsLogger = new ExtensionsLogger(resourceId); } public void collect(Event event) { @@ -59,8 +60,7 @@ public void collect(Event event) { producer.publish(dataFormatDefinition.fromMap(outEvent)); SpMonitoringManager.INSTANCE.increaseOutCounter(resourceId, System.currentTimeMillis()); } catch (SpRuntimeException e) { - var logEntry = SpLogEntry.from(System.currentTimeMillis(), StreamPipesErrorMessage.from(e)); - SpMonitoringManager.INSTANCE.addErrorMessage(resourceId, logEntry); + extensionsLogger.error(e); LOG.error("Could not publish event", e); } } diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventProcessorRuntime.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventProcessorRuntime.java index c3e250fe76..c2b70c5401 100644 --- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventProcessorRuntime.java +++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventProcessorRuntime.java @@ -70,13 +70,13 @@ public SpOutputCollector getOutputCollector() throws SpRuntimeException { @Override public void process(Map rawEvent, String sourceInfo) { try { - runtimeContext.getLogger().increaseInCounter(instanceId, sourceInfo, System.currentTimeMillis()); + monitoringManager.increaseInCounter(instanceId, sourceInfo, System.currentTimeMillis()); var event = this.internalRuntimeParameters.makeEvent(runtimeParameters, rawEvent, sourceInfo); pipelineElement .onEvent(event, outputCollector); } catch (RuntimeException e) { LOG.error("RuntimeException while processing event in {}", pipelineElement.getClass().getCanonicalName(), e); - addLogEntry(runtimeContext.getLogger(), instanceId, e); + addLogEntry(e); } } diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventSinkRuntime.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventSinkRuntime.java index 27be8ee8ae..7c7f407c14 100644 --- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventSinkRuntime.java +++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventSinkRuntime.java @@ -51,11 +51,11 @@ public StandaloneEventSinkRuntime() { @Override public void process(Map rawEvent, String sourceInfo) { try { - runtimeContext.getLogger().increaseInCounter(instanceId, sourceInfo, System.currentTimeMillis()); + monitoringManager.increaseInCounter(instanceId, sourceInfo, System.currentTimeMillis()); pipelineElement.onEvent(internalRuntimeParameters.makeEvent(runtimeParameters, rawEvent, sourceInfo)); } catch (RuntimeException e) { LOG.error("RuntimeException while processing event in {}", pipelineElement.getClass().getCanonicalName(), e); - addLogEntry(runtimeContext.getLogger(), instanceId, e); + addLogEntry(e); } } diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandalonePipelineElementRuntime.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandalonePipelineElementRuntime.java index 8005ca4d5e..d35317cda5 100644 --- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandalonePipelineElementRuntime.java +++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandalonePipelineElementRuntime.java @@ -31,9 +31,7 @@ import org.apache.streampipes.extensions.api.pe.routing.RawDataProcessor; import org.apache.streampipes.extensions.api.pe.routing.SpInputCollector; import org.apache.streampipes.model.SpDataStream; -import org.apache.streampipes.model.StreamPipesErrorMessage; import org.apache.streampipes.model.base.InvocableStreamPipesEntity; -import org.apache.streampipes.model.monitoring.SpLogEntry; import org.apache.streampipes.wrapper.params.InternalRuntimeParameters; import org.apache.streampipes.wrapper.runtime.PipelineElementRuntime; import org.apache.streampipes.wrapper.standalone.manager.ProtocolManager; @@ -58,10 +56,13 @@ public abstract class StandalonePipelineElementRuntime< protected PeT pipelineElement; protected IInternalRuntimeParameters internalRuntimeParameters; + protected final SpMonitoringManager monitoringManager; + public StandalonePipelineElementRuntime(IContextGenerator contextGenerator, IParameterGenerator parameterGenerator) { super(contextGenerator, parameterGenerator); this.internalRuntimeParameters = new InternalRuntimeParameters(); + this.monitoringManager = SpMonitoringManager.INSTANCE; } @Override @@ -80,13 +81,12 @@ public void startRuntime(IvT pipelineElementInvocation, @Override public void stopRuntime() { this.inputCollectors.forEach(is -> is.unregisterConsumer(instanceId)); - resetCounter(runtimeContext.getLogger(), instanceId); + resetCounter(instanceId); afterStop(); } - protected void resetCounter(SpMonitoringManager manager, - String resourceId) throws SpRuntimeException { - manager.resetCounter(resourceId); + protected void resetCounter(String resourceId) throws SpRuntimeException { + monitoringManager.resetCounter(resourceId); } protected List getInputCollectors(List inputStreams) throws SpRuntimeException { @@ -99,12 +99,8 @@ protected List getInputCollectors(List inputStre return inputCollectors; } - protected void addLogEntry(SpMonitoringManager manager, - String elementId, - RuntimeException e) { - manager.addErrorMessage( - elementId, - SpLogEntry.from(System.currentTimeMillis(), StreamPipesErrorMessage.from(e))); + protected void addLogEntry(RuntimeException e) { + runtimeContext.getLogger().error(e); } protected void connectInputCollectors() { diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/context/SpEventProcessorRuntimeContext.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/context/SpEventProcessorRuntimeContext.java index 6ad5ba0db8..05fca370af 100644 --- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/context/SpEventProcessorRuntimeContext.java +++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/context/SpEventProcessorRuntimeContext.java @@ -17,10 +17,10 @@ */ package org.apache.streampipes.wrapper.context; -import org.apache.streampipes.client.StreamPipesClient; -import org.apache.streampipes.extensions.api.monitoring.SpMonitoringManager; +import org.apache.streampipes.client.api.IStreamPipesClient; +import org.apache.streampipes.extensions.api.config.IConfigExtractor; +import org.apache.streampipes.extensions.api.monitoring.IExtensionsLogger; import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext; -import org.apache.streampipes.extensions.management.config.ConfigExtractor; import java.io.Serializable; @@ -29,14 +29,9 @@ public class SpEventProcessorRuntimeContext extends SpRuntimeContext implements public SpEventProcessorRuntimeContext(String correspondingUser, - ConfigExtractor configExtractor, - StreamPipesClient streamPipesClient, - SpMonitoringManager logManager) { - super(correspondingUser, configExtractor, streamPipesClient, logManager); + IConfigExtractor configExtractor, + IStreamPipesClient streamPipesClient, + IExtensionsLogger extensionsLogger) { + super(correspondingUser, configExtractor, streamPipesClient, extensionsLogger); } - - public SpEventProcessorRuntimeContext() { - super(); - } - } diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/context/SpEventSinkRuntimeContext.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/context/SpEventSinkRuntimeContext.java index 6ef795a7e4..6bfc03d03f 100644 --- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/context/SpEventSinkRuntimeContext.java +++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/context/SpEventSinkRuntimeContext.java @@ -17,10 +17,10 @@ */ package org.apache.streampipes.wrapper.context; -import org.apache.streampipes.client.StreamPipesClient; -import org.apache.streampipes.extensions.api.monitoring.SpMonitoringManager; +import org.apache.streampipes.client.api.IStreamPipesClient; +import org.apache.streampipes.extensions.api.config.IConfigExtractor; +import org.apache.streampipes.extensions.api.monitoring.IExtensionsLogger; import org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext; -import org.apache.streampipes.extensions.management.config.ConfigExtractor; import java.io.Serializable; @@ -28,9 +28,9 @@ public class SpEventSinkRuntimeContext extends SpRuntimeContext implements EventSinkRuntimeContext, Serializable { public SpEventSinkRuntimeContext(String correspondingUser, - ConfigExtractor configExtractor, - StreamPipesClient streamPipesClient, - SpMonitoringManager logManager) { - super(correspondingUser, configExtractor, streamPipesClient, logManager); + IConfigExtractor configExtractor, + IStreamPipesClient streamPipesClient, + IExtensionsLogger extensionsLogger) { + super(correspondingUser, configExtractor, streamPipesClient, extensionsLogger); } } diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/context/SpRuntimeContext.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/context/SpRuntimeContext.java index 081f7034ee..aa4a73e76b 100644 --- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/context/SpRuntimeContext.java +++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/context/SpRuntimeContext.java @@ -17,36 +17,32 @@ */ package org.apache.streampipes.wrapper.context; -import org.apache.streampipes.client.StreamPipesClient; -import org.apache.streampipes.extensions.api.monitoring.SpMonitoringManager; +import org.apache.streampipes.client.api.IStreamPipesClient; +import org.apache.streampipes.extensions.api.config.IConfigExtractor; +import org.apache.streampipes.extensions.api.monitoring.IExtensionsLogger; import org.apache.streampipes.extensions.api.pe.context.RuntimeContext; -import org.apache.streampipes.extensions.management.config.ConfigExtractor; public class SpRuntimeContext implements RuntimeContext { - private String correspondingUser; - private ConfigExtractor configExtractor; - private StreamPipesClient streamPipesClient; - private SpMonitoringManager spLogManager; + private final String correspondingUser; + private final IConfigExtractor configExtractor; + private final IStreamPipesClient streamPipesClient; + private final IExtensionsLogger extensionsLogger; public SpRuntimeContext(String correspondingUser, - ConfigExtractor configExtractor, - StreamPipesClient streamPipesClient, - SpMonitoringManager spLogManager) { + IConfigExtractor configExtractor, + IStreamPipesClient streamPipesClient, + IExtensionsLogger extensionsLogger) { this.correspondingUser = correspondingUser; this.configExtractor = configExtractor; this.streamPipesClient = streamPipesClient; - this.spLogManager = spLogManager; - } - - public SpRuntimeContext() { - + this.extensionsLogger = extensionsLogger; } @Override - public SpMonitoringManager getLogger() { - return spLogManager; + public IExtensionsLogger getLogger() { + return extensionsLogger; } @Override @@ -55,12 +51,12 @@ public String getCorrespondingUser() { } @Override - public ConfigExtractor getConfigStore() { + public IConfigExtractor getConfigStore() { return this.configExtractor; } @Override - public StreamPipesClient getStreamPipesClient() { + public IStreamPipesClient getStreamPipesClient() { return streamPipesClient; } } diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/context/generator/DataProcessorContextGenerator.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/context/generator/DataProcessorContextGenerator.java index 60e04e5493..2783e3e278 100644 --- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/context/generator/DataProcessorContextGenerator.java +++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/context/generator/DataProcessorContextGenerator.java @@ -18,9 +18,9 @@ package org.apache.streampipes.wrapper.context.generator; -import org.apache.streampipes.extensions.api.monitoring.SpMonitoringManager; import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext; import org.apache.streampipes.extensions.api.pe.context.IContextGenerator; +import org.apache.streampipes.extensions.management.monitoring.ExtensionsLogger; import org.apache.streampipes.extensions.management.util.RuntimeContextUtils; import org.apache.streampipes.model.graph.DataProcessorInvocation; import org.apache.streampipes.wrapper.context.SpEventProcessorRuntimeContext; @@ -34,6 +34,6 @@ public EventProcessorRuntimeContext makeContext(DataProcessorInvocation invocati invocation.getCorrespondingUser(), RuntimeContextUtils.makeConfigExtractor(), RuntimeContextUtils.makeStreamPipesClient(), - SpMonitoringManager.INSTANCE); + new ExtensionsLogger(invocation.getElementId())); } } diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/context/generator/DataSinkContextGenerator.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/context/generator/DataSinkContextGenerator.java index 28166cd610..c2413a9302 100644 --- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/context/generator/DataSinkContextGenerator.java +++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/context/generator/DataSinkContextGenerator.java @@ -18,9 +18,9 @@ package org.apache.streampipes.wrapper.context.generator; -import org.apache.streampipes.extensions.api.monitoring.SpMonitoringManager; import org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext; import org.apache.streampipes.extensions.api.pe.context.IContextGenerator; +import org.apache.streampipes.extensions.management.monitoring.ExtensionsLogger; import org.apache.streampipes.extensions.management.util.RuntimeContextUtils; import org.apache.streampipes.model.graph.DataSinkInvocation; import org.apache.streampipes.wrapper.context.SpEventSinkRuntimeContext; @@ -33,6 +33,6 @@ public EventSinkRuntimeContext makeContext(DataSinkInvocation invocation) { invocation.getCorrespondingUser(), RuntimeContextUtils.makeConfigExtractor(), RuntimeContextUtils.makeStreamPipesClient(), - SpMonitoringManager.INSTANCE); + new ExtensionsLogger(invocation.getElementId())); } } diff --git a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts index 2698ec6f5d..ca4deba8b4 100644 --- a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts +++ b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts @@ -20,7 +20,7 @@ /* tslint:disable */ /* eslint-disable */ // @ts-nocheck -// Generated using typescript-generator version 3.1.1185 on 2023-07-19 22:10:18. +// Generated using typescript-generator version 3.1.1185 on 2023-07-23 10:14:46. export class NamedStreamPipesEntity { '@class': @@ -3325,7 +3325,7 @@ export class SpDataStreamContainer { } export class SpLogEntry { - errorMessage: StreamPipesErrorMessage; + errorMessage: SpLogMessage; timestamp: number; static fromData(data: SpLogEntry, target?: SpLogEntry): SpLogEntry { @@ -3333,14 +3333,33 @@ export class SpLogEntry { return data; } const instance = target || new SpLogEntry(); - instance.errorMessage = StreamPipesErrorMessage.fromData( - data.errorMessage, - ); + instance.errorMessage = SpLogMessage.fromData(data.errorMessage); instance.timestamp = data.timestamp; return instance; } } +export class SpLogMessage { + cause: string; + detail: string; + fullStackTrace: string; + level: SpLogLevel; + title: string; + + static fromData(data: SpLogMessage, target?: SpLogMessage): SpLogMessage { + if (!data) { + return data; + } + const instance = target || new SpLogMessage(); + instance.cause = data.cause; + instance.detail = data.detail; + instance.fullStackTrace = data.fullStackTrace; + instance.level = data.level; + instance.title = data.title; + return instance; + } +} + export class SpMetricsEntry { lastTimestamp: number; messagesIn: { [index: string]: MessageCounter }; @@ -3521,30 +3540,6 @@ export class StreamPipesApplicationPackage { } } -export class StreamPipesErrorMessage { - cause: string; - detail: string; - fullStackTrace: string; - level: string; - title: string; - - static fromData( - data: StreamPipesErrorMessage, - target?: StreamPipesErrorMessage, - ): StreamPipesErrorMessage { - if (!data) { - return data; - } - const instance = target || new StreamPipesErrorMessage(); - instance.cause = data.cause; - instance.detail = data.detail; - instance.fullStackTrace = data.fullStackTrace; - instance.level = data.level; - instance.title = data.title; - return instance; - } -} - export class SuccessMessage extends Message { static fromData( data: SuccessMessage, @@ -3852,6 +3847,8 @@ export type SelectionStaticPropertyUnion = | AnyStaticProperty | OneOfStaticProperty; +export type SpLogLevel = 'INFO' | 'WARN' | 'ERROR'; + export type SpQueryStatus = 'OK' | 'TOO_MUCH_DATA'; export type StaticPropertyType = diff --git a/ui/projects/streampipes/shared-ui/src/lib/components/basic-nav-tabs/basic-nav-tabs.component.html b/ui/projects/streampipes/shared-ui/src/lib/components/basic-nav-tabs/basic-nav-tabs.component.html index 4759f541bf..abe940f2aa 100644 --- a/ui/projects/streampipes/shared-ui/src/lib/components/basic-nav-tabs/basic-nav-tabs.component.html +++ b/ui/projects/streampipes/shared-ui/src/lib/components/basic-nav-tabs/basic-nav-tabs.component.html @@ -36,12 +36,7 @@ arrow_back -