Skip to content

Commit

Permalink
fixed a problem that caused the KB to be overloaded with requests
Browse files Browse the repository at this point in the history
  • Loading branch information
mmiglier committed Mar 27, 2015
1 parent 8a3ad50 commit 1beab1b
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 93 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ for further details.

## Change List

v0.3.3:
* fixed a problem that caused the KB to be overloaded with requests
* updated to [knowledge-base-api 2.3.1](https://github.com/deib-polimi/modaclouds-knowledge-base-api/releases/tag/v2.3.1)
* updated to [qos-models 2.4.1](https://github.com/deib-polimi/modaclouds-qos-models/releases/tag/v2.4.1)

v0.3.1:
* bug fix

Expand Down
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>it.polimi.modaclouds.monitoring</groupId>
<artifactId>data-collector-factory</artifactId>
<version>0.3.2</version>
<version>0.3.3</version>
<name>MODAClouds Data Collector Factory</name>
<inceptionYear>2014</inceptionYear>

Expand Down Expand Up @@ -102,7 +102,7 @@
<dependency>
<groupId>it.polimi.modaclouds.monitoring.knowledge-base-api</groupId>
<artifactId>knowledge-base-api</artifactId>
<version>2.2.1</version>
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
Expand All @@ -119,7 +119,7 @@
<dependency>
<groupId>it.polimi.modaclouds.qos-models</groupId>
<artifactId>qos-models</artifactId>
<version>2.2.1</version>
<version>2.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.jena</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import it.polimi.modaclouds.monitoring.dcfactory.wrappers.DDAConnector;
import it.polimi.modaclouds.monitoring.dcfactory.wrappers.KBConnector;
import it.polimi.modaclouds.qos_models.monitoring_ontology.MOVocabulary;
import it.polimi.modaclouds.qos_models.monitoring_ontology.Resource;

import java.util.HashMap;
Expand All @@ -40,20 +39,15 @@ public abstract class DataCollectorFactory {
private final Logger logger = LoggerFactory
.getLogger(DataCollectorFactory.class);

private DDAConnector dda;
protected DDAConnector dda;
private ScheduledExecutorService executorService = Executors
.newSingleThreadScheduledExecutor();
private ScheduledFuture<?> kbSyncExecutorHandler;
private Map<String, Set<DCConfig>> dCsConfigByMetric;
private Set<DCConfig> allDCsConfigs;
private KBConnector kb;
// private int kbSyncPeriod;
protected KBConnector kb;
private boolean isSyncingWithKB = false;

// private Set<String> monitoredResourcesIds;
// private Set<String> monitoredMetrics;

// private Set<DataCollector> installedDataCollectors;

/**
* This method will be called whenever synchronization with KB ends. The
Expand All @@ -62,9 +56,9 @@ public abstract class DataCollectorFactory {
*/
protected abstract void syncedWithKB();

public DataCollectorFactory(DDAConnector dda, KBConnector kb) {
this.dda = dda;
this.kb = kb;
public DataCollectorFactory(String ddaUrl, String kbUrl) {
this.dda = new DDAConnector(ddaUrl);
this.kb = new KBConnector(kbUrl);
dCsConfigByMetric = new HashMap<String, Set<DCConfig>>();
allDCsConfigs = new HashSet<DCConfig>();
}
Expand Down Expand Up @@ -142,17 +136,11 @@ private void syncWithKB() {
syncedWithKB();
}

public boolean monitoringRequired(String resourceId, DCConfig dcConfig) {
if (dcConfig.getMonitoredResourcesIds().contains(resourceId))
public boolean monitoringRequired(Resource resource, DCConfig dcConfig) {
if (dcConfig.getMonitoredResourcesIds().contains(resource.getId()))
return true;
if (!dcConfig.getMonitoredResourcesIds().isEmpty())
return false;
Resource resource = kb.getResourceById(resourceId);
if (resource == null) {
logger.error("There is no resource with {} {} on the KB",
MOVocabulary.resourceIdParameterName, resourceId);
return false;
}
for (String type : dcConfig.getMonitoredResourcesTypes()) {
if (type.equalsIgnoreCase(resource.getType()))
return true;
Expand All @@ -166,7 +154,7 @@ public boolean monitoringRequired(String resourceId, DCConfig dcConfig) {
return false;
}

public Set<DCConfig> getConfiguration(String resourceId,
public Set<DCConfig> getConfiguration(Resource resource,
String monitoredMetric) {
Set<DCConfig> selectedConfig = new HashSet<DCConfig>();
Set<DCConfig> allDCsConfigs;
Expand All @@ -176,8 +164,8 @@ public Set<DCConfig> getConfiguration(String resourceId,
allDCsConfigs = this.allDCsConfigs;
if (allDCsConfigs != null) {
for (DCConfig dcConfig : allDCsConfigs) {
if (resourceId == null
|| monitoringRequired(resourceId, dcConfig)) {
if (resource == null
|| monitoringRequired(resource, dcConfig)) {
selectedConfig.add(dcConfig);
}
}
Expand All @@ -193,9 +181,9 @@ public Set<DCConfig> getConfiguration(String resourceId,
* @param monitoredResourceId
*/
public void sendSyncMonitoringDatum(String value, String metric,
String monitoredResourceId) {
Resource resource) {
dda.sendSyncMonitoringDatum(value, metric.toLowerCase(),
monitoredResourceId);
resource);
}

/**
Expand All @@ -206,9 +194,9 @@ public void sendSyncMonitoringDatum(String value, String metric,
* @param monitoredResourceId
*/
public void sendSyncMonitoringData(List<String> values, String metric,
String monitoredResourceId) {
Resource resource) {
dda.sendSyncMonitoringData(values, metric.toLowerCase(),
monitoredResourceId);
resource);
}

/**
Expand All @@ -219,8 +207,8 @@ public void sendSyncMonitoringData(List<String> values, String metric,
* @param monitoredResourceId
*/
public void sendAsyncMonitoringDatum(String value, String metric,
String monitoredResourceId) {
Resource resource) {
dda.sendAsyncMonitoringDatum(value, metric.toLowerCase(),
monitoredResourceId);
resource);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import it.polimi.deib.csparql_rest_api.RSP_services_csparql_API;
import it.polimi.deib.csparql_rest_api.exception.ServerErrorException;
import it.polimi.deib.csparql_rest_api.exception.StreamErrorException;
import it.polimi.modaclouds.qos_models.monitoring_ontology.Resource;

import java.util.Arrays;
import java.util.HashMap;
Expand Down Expand Up @@ -58,26 +59,29 @@ public DDAConnector(String ddaURL) {
}

public void sendSyncMonitoringData(List<String> values, String metric,
String monitoredResourceId) {
send(values, metric, monitoredResourceId);
Resource resource) {
send(values, metric, resource);
}

public void sendSyncMonitoringDatum(String value, String metric,
String monitoredResourceId) {
Resource resource) {
logger.info("Sending datum synchronously: {} {} {}", resource.getId(), metric,
value);
send(Arrays.asList(new String[] { value }), metric.toLowerCase(),
monitoredResourceId);
resource);
}

/**
* Data is buffered and sent all together after a predefined delay (e.g. 1
* second)
* Data is buffered and sent all together after a delay of 1 second
*
* @param value
* @param metric
* @param monitoredResourceId
*/
public synchronized void sendAsyncMonitoringDatum(String value,
String metric, String monitoredResourceId) {
String metric, Resource resource) {
logger.info(
"Sending datum asynchronously: {} {} {}", resource.getId(), metric, value);
metric = metric.toLowerCase();
String monDatumInstanceURI = DDAOntology.MonitoringDatum + "#"
+ UUID.randomUUID().toString();
Expand All @@ -87,8 +91,7 @@ public synchronized void sendAsyncMonitoringDatum(String value,
model = ModelFactory.createDefaultModel();
modelByMetric.put(metric, model);
}
addDatumToModel(model, monDatumInstanceURI, value, metric,
monitoredResourceId);
addDatumToModel(model, monDatumInstanceURI, value, metric, resource);
if (!timerRunning) {
timerRunning = true;
timer.schedule(new TimerTask() {
Expand All @@ -112,17 +115,18 @@ private synchronized void timeIsUp() {
private void send(Model model, String metric) {
String streamURI = getStreamURI(metric);
try {
csparql_api.feedStream(streamURI, model);
logger.info("Monitoring data sent to {} on stream {}", ddaURL,
logger.debug("Sending monitoring datum to {} on stream {}", ddaURL,
streamURI);
csparql_api.feedStream(streamURI, model);
} catch (ServerErrorException | StreamErrorException e) {
logger.error("Error while sending monitoring datum to {}", ddaURL,
e);
logger.error(
"Error while sending monitoring datum to {}: {}",
ddaURL, e.getMessage());
}
}

private void addDatumToModel(Model m, String datumUri, String value,
String metric, String monitoredResourceId) {
String metric, Resource resource) {
m.createResource(datumUri)
.addProperty(RDF.type, DDAOntology.MonitoringDatum)
.addProperty(DDAOntology.metric,
Expand All @@ -131,13 +135,13 @@ private void addDatumToModel(Model m, String datumUri, String value,
m.createTypedLiteral(value, XSDDatatype.XSDdouble))
.addProperty(
DDAOntology.resourceId,
m.createTypedLiteral(monitoredResourceId,
m.createTypedLiteral(resource.getId(),
XSDDatatype.XSDstring));
}

private void send(List<String> values, String metric,
String monitoredResourceId) {
Model m = createModel(values, metric, monitoredResourceId);
Resource resource) {
Model m = createModel(values, metric, resource);
send(m, metric);
}

Expand All @@ -147,13 +151,13 @@ public static String getStreamURI(String metric) {
}

private Model createModel(List<String> values, String metric,
String monitoredResourceId) {
Resource resource) {
Model m = ModelFactory.createDefaultModel();
for (String value : values) {
String monDatumInstanceURI = DDAOntology.MonitoringDatum + "#"
+ UUID.randomUUID().toString();
addDatumToModel(m, monDatumInstanceURI, value, metric,
monitoredResourceId);
resource);
}
return m;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import it.polimi.modaclouds.qos_models.monitoring_ontology.MOVocabulary;
import it.polimi.modaclouds.qos_models.monitoring_ontology.Resource;

import java.util.HashSet;
import java.util.Set;

import org.slf4j.Logger;
Expand Down Expand Up @@ -52,6 +51,11 @@ public Set<DCConfig> getAllDCsConfig() {
return allConfigs;
}

/**
*
* @param resourceId
* @return the resource if exists, null otherwise
*/
public Resource getResourceById(String resourceId) {
try {
return (Resource) fusekiKBAPI.getEntityById(resourceId,
Expand Down

0 comments on commit 1beab1b

Please sign in to comment.