Skip to content

Commit

Permalink
Improve monitoring of adapters and pipelines (#1787)
Browse files Browse the repository at this point in the history
  • Loading branch information
dominikriemer committed Jul 23, 2023
1 parent 329f56c commit 6f8e763
Show file tree
Hide file tree
Showing 51 changed files with 369 additions and 269 deletions.
Expand Up @@ -18,10 +18,10 @@

package org.apache.streampipes.extensions.api.connect.context;

import org.apache.streampipes.extensions.api.monitoring.SpMonitoringManager;
import org.apache.streampipes.extensions.api.monitoring.IExtensionsLogger;

public interface IAdapterRuntimeContext extends IAdapterGuessSchemaContext {

SpMonitoringManager getLogger();
IExtensionsLogger getLogger();

}
Expand Up @@ -16,36 +16,47 @@
*
*/

package org.apache.streampipes.model.monitoring;
package org.apache.streampipes.extensions.api.monitoring;

public class StreamPipesRuntimeError {
import java.util.ArrayList;
import java.util.List;

private long timestamp;
private String title;
private String message;
public class FixedSizeList<T> {
private final int maxSize;
private final List<T> list;

private String stackTrace;
public FixedSizeList(int maxSize) {
if (maxSize <= 0) {
throw new IllegalArgumentException("Max size must be greater than zero.");
}
this.maxSize = maxSize;
this.list = new ArrayList<>();
}

public StreamPipesRuntimeError(long timestamp, String title, String message, String stackTrace) {
this.timestamp = timestamp;
this.title = title;
this.message = message;
this.stackTrace = stackTrace;
public void add(T element) {
list.add(0, element);
if (list.size() > maxSize) {
list.remove(list.size() - 1);
}
}

public long getTimestamp() {
return timestamp;
public T get(int index) {
return list.get(index);
}

public String getTitle() {
return title;
public int size() {
return list.size();
}

public String getMessage() {
return message;
public List<T> getAllItems() {
return this.list;
}

public String getStackTrace() {
return stackTrace;
public void clear() {
this.list.clear();
}
}




Expand Up @@ -18,7 +18,20 @@

package org.apache.streampipes.extensions.api.monitoring;

import org.slf4j.Logger;
import org.apache.streampipes.model.monitoring.SpLogMessage;

public interface IPipelineElementLogger extends Logger {
public interface IExtensionsLogger {

void log(SpLogMessage logMessage);

void error(Exception e);

void error(String details,
Exception e);

void info(String title,
String details);

void warn(String title,
String details);
}
Expand Up @@ -22,7 +22,6 @@
import org.apache.streampipes.model.monitoring.SpLogEntry;
import org.apache.streampipes.model.monitoring.SpMetricsEntry;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -31,7 +30,7 @@ public enum SpMonitoringManager {

INSTANCE;

private final Map<String, List<SpLogEntry>> logInfos;
private final Map<String, FixedSizeList<SpLogEntry>> logInfos;
private final Map<String, SpMetricsEntry> metricsInfos;

SpMonitoringManager() {
Expand All @@ -42,9 +41,9 @@ public enum SpMonitoringManager {
public void addErrorMessage(String resourceId,
SpLogEntry errorMessageEntry) {
if (!logInfos.containsKey(resourceId)) {
logInfos.put(resourceId, new ArrayList<>());
logInfos.put(resourceId, new FixedSizeList<>(100));
}
this.logInfos.get(resourceId).add(0, errorMessageEntry);
this.logInfos.get(resourceId).add(errorMessageEntry);
}

public void increaseInCounter(String resourceId,
Expand Down Expand Up @@ -87,18 +86,28 @@ public SpMetricsEntry getMetricsEntry(String resourceId,
return currentEntry;
}

public Map<String, List<SpLogEntry>> getAllLogs() {
return logInfos;
public SpEndpointMonitoringInfo getMonitoringInfo() {
var logInfos = makeLogInfos();
return new SpEndpointMonitoringInfo(logInfos, metricsInfos);
}

public Map<String, SpMetricsEntry> getAllMetrics() {
return this.metricsInfos;
public void clearAllLogs() {
this.logInfos.forEach((key, value) -> value.clear());
}

public SpEndpointMonitoringInfo getMonitoringInfo() {
return new SpEndpointMonitoringInfo(logInfos, metricsInfos);
private Map<String, List<SpLogEntry>> makeLogInfos() {
var logEntries = new HashMap<String, List<SpLogEntry>>();
this.logInfos.forEach((key, value) ->
logEntries.put(key, cloneList(value.getAllItems())));

return logEntries;
}

private List<SpLogEntry> cloneList(List<SpLogEntry> allItems) {
return allItems.stream().map(SpLogEntry::new).toList();
}


private void checkAndPrepareMetrics(String resourceId) {
if (!metricsInfos.containsKey(resourceId)) {
addMetricsObject(resourceId);
Expand All @@ -109,8 +118,4 @@ private void addMetricsObject(String resourceId) {
this.metricsInfos.put(resourceId, new SpMetricsEntry());
}


public void clearAllLogs() {
logInfos.forEach((key, value) -> value.clear());
}
}
Expand Up @@ -19,11 +19,11 @@

import org.apache.streampipes.client.api.IStreamPipesClient;
import org.apache.streampipes.extensions.api.config.IConfigExtractor;
import org.apache.streampipes.extensions.api.monitoring.SpMonitoringManager;
import org.apache.streampipes.extensions.api.monitoring.IExtensionsLogger;

public interface RuntimeContext {

SpMonitoringManager getLogger();
IExtensionsLogger getLogger();

String getCorrespondingUser();

Expand Down
Expand Up @@ -23,6 +23,7 @@
import org.apache.streampipes.extensions.api.connect.context.IAdapterRuntimeContext;
import org.apache.streampipes.extensions.api.monitoring.SpMonitoringManager;
import org.apache.streampipes.extensions.management.connect.adapter.model.EventCollector;
import org.apache.streampipes.extensions.management.context.AdapterContextGenerator;
import org.apache.streampipes.extensions.management.init.IDeclarersSingleton;
import org.apache.streampipes.extensions.management.init.RunningAdapterInstances;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
Expand All @@ -41,14 +42,10 @@ public class AdapterWorkerManagement {
private final RunningAdapterInstances runningAdapterInstances;
private final IDeclarersSingleton declarers;

private final IAdapterRuntimeContext adapterRuntimeContext;

public AdapterWorkerManagement(RunningAdapterInstances runningAdapterInstances,
IDeclarersSingleton declarers,
IAdapterRuntimeContext runtimeContext) {
IDeclarersSingleton declarers) {
this.runningAdapterInstances = runningAdapterInstances;
this.declarers = declarers;
this.adapterRuntimeContext = runtimeContext;
}

public Collection<AdapterDescription> getAllRunningAdapterInstances() {
Expand All @@ -69,8 +66,9 @@ public void invokeAdapter(AdapterDescription adapterDescription) throws AdapterE
var registeredParsers = newAdapterInstance.declareConfig().getSupportedParsers();
var extractor = AdapterParameterExtractor.from(adapterDescription, registeredParsers);
var eventCollector = EventCollector.from(adapterDescription);
var runtimeContext = makeRuntimeContext(adapterDescription.getElementId());

newAdapterInstance.onAdapterStarted(extractor, eventCollector, adapterRuntimeContext);
newAdapterInstance.onAdapterStarted(extractor, eventCollector, runtimeContext);
} else {
var errorMessage = "Adapter with id %s could not be found".formatted(adapterDescription.getAppId());
LOG.error(errorMessage);
Expand All @@ -88,12 +86,17 @@ public void stopAdapter(AdapterDescription adapterDescription) throws AdapterExc

var registeredParsers = adapter.declareConfig().getSupportedParsers();
var extractor = AdapterParameterExtractor.from(adapterDescription, registeredParsers);
adapter.onAdapterStopped(extractor, adapterRuntimeContext);
var runtimeContext = makeRuntimeContext(elementId);
adapter.onAdapterStopped(extractor, runtimeContext);
}

resetMonitoring(elementId);
}

private IAdapterRuntimeContext makeRuntimeContext(String adapterInstanceId) {
return new AdapterContextGenerator().makeRuntimeContext(adapterInstanceId);
}

private void resetMonitoring(String elementId) {
SpMonitoringManager.INSTANCE.reset(elementId);
}
Expand Down
Expand Up @@ -20,8 +20,8 @@

import org.apache.streampipes.extensions.api.connect.IPullAdapter;
import org.apache.streampipes.extensions.api.monitoring.SpMonitoringManager;
import org.apache.streampipes.model.StreamPipesErrorMessage;
import org.apache.streampipes.model.monitoring.SpLogEntry;
import org.apache.streampipes.model.monitoring.SpLogMessage;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -45,7 +45,7 @@ public void schedule(IPullAdapter pullAdapter,
} catch (ExecutionException | InterruptedException e) {
SpMonitoringManager.INSTANCE.addErrorMessage(
adapterElementId,
SpLogEntry.from(System.currentTimeMillis(), StreamPipesErrorMessage.from(e)));
SpLogEntry.from(System.currentTimeMillis(), SpLogMessage.from(e)));
} catch (TimeoutException e) {
LOGGER.warn("Timeout occurred", e);
}
Expand Down
Expand Up @@ -24,12 +24,11 @@
import org.apache.streampipes.extensions.api.connect.IAdapterPipelineElement;
import org.apache.streampipes.extensions.api.monitoring.SpMonitoringManager;
import org.apache.streampipes.extensions.management.connect.adapter.util.TransportFormatSelector;
import org.apache.streampipes.extensions.management.monitoring.ExtensionsLogger;
import org.apache.streampipes.messaging.EventProducer;
import org.apache.streampipes.model.StreamPipesErrorMessage;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.model.grounding.TransportFormat;
import org.apache.streampipes.model.grounding.TransportProtocol;
import org.apache.streampipes.model.monitoring.SpLogEntry;

import java.util.Map;

Expand Down Expand Up @@ -78,9 +77,7 @@ public Map<String, Object> process(Map<String, Object> event) {
System.currentTimeMillis());
}
} catch (RuntimeException e) {
SpMonitoringManager.INSTANCE.addErrorMessage(
adapterDescription.getElementId(),
SpLogEntry.from(System.currentTimeMillis(), StreamPipesErrorMessage.from(e)));
new ExtensionsLogger(adapterDescription.getElementId()).error(e);
}
return null;
}
Expand Down
Expand Up @@ -20,15 +20,18 @@

import org.apache.streampipes.extensions.api.connect.context.IAdapterGuessSchemaContext;
import org.apache.streampipes.extensions.api.connect.context.IAdapterRuntimeContext;
import org.apache.streampipes.extensions.api.monitoring.SpMonitoringManager;
import org.apache.streampipes.extensions.management.monitoring.ExtensionsLogger;

import static org.apache.streampipes.extensions.management.util.RuntimeContextUtils.makeConfigExtractor;
import static org.apache.streampipes.extensions.management.util.RuntimeContextUtils.makeStreamPipesClient;

public class AdapterContextGenerator {

public IAdapterRuntimeContext makeRuntimeContext() {
return new SpAdapterRuntimeContext(SpMonitoringManager.INSTANCE, makeConfigExtractor(), makeStreamPipesClient());
public IAdapterRuntimeContext makeRuntimeContext(String adapterInstanceId) {
return new SpAdapterRuntimeContext(
new ExtensionsLogger(adapterInstanceId),
makeConfigExtractor(),
makeStreamPipesClient());
}

public IAdapterGuessSchemaContext makeGuessSchemaContext() {
Expand Down
Expand Up @@ -20,25 +20,25 @@

import org.apache.streampipes.client.StreamPipesClient;
import org.apache.streampipes.extensions.api.connect.context.IAdapterRuntimeContext;
import org.apache.streampipes.extensions.api.monitoring.SpMonitoringManager;
import org.apache.streampipes.extensions.api.monitoring.IExtensionsLogger;
import org.apache.streampipes.extensions.management.config.ConfigExtractor;

import java.io.Serializable;

public class SpAdapterRuntimeContext extends SpAdapterGuessSchemaContext
implements IAdapterRuntimeContext, Serializable {

private SpMonitoringManager monitoringManager;
private final IExtensionsLogger extensionsLogger;

public SpAdapterRuntimeContext(SpMonitoringManager monitoringManager,
public SpAdapterRuntimeContext(IExtensionsLogger extensionsLogger,
ConfigExtractor configExtractor,
StreamPipesClient streamPipesClient) {
super(configExtractor, streamPipesClient);
this.monitoringManager = monitoringManager;
this.extensionsLogger = extensionsLogger;
}

@Override
public SpMonitoringManager getLogger() {
return monitoringManager;
public IExtensionsLogger getLogger() {
return extensionsLogger;
}
}

0 comments on commit 6f8e763

Please sign in to comment.